Loading...
Loading...
Implement enterprise Single Sign-On (SSO) authentication supporting multiple identity providers with JWT RS256 tokens, backwards verification, session management, and cross-system permission mapping. Use this skill when building authentication systems that integrate with multiple enterprise SSO providers or when implementing secure token validation with session verification.
npx skill4agent add rafaelkamimura/claude-tools multi-system-sso-authentication┌─────────────────────────────────────────────────────────┐
│ Your Application │
│ ┌────────────────────────────────────────────────────┐ │
│ │ UnifiedAuthAdapter (Router) │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Check token issuer (iss claim) │ │ │
│ │ │ Route to appropriate adapter │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ ▼ ▼ ▼ ▼ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ CORP │ │ SGF │ │ GED │ │ CARRINHO│ │ │
│ │ │ Adapter │ │ Adapter │ │ Adapter │ │ Adapter │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Corporativo│ │ SGF │ │ GED │ │ Carrinho │
│ SSO │ │ API │ │ API │ │ API │
└───────────┘ └───────────┘ └───────────┘ └───────────┘src/
├── api/
│ ├── middlewares/
│ │ └── auth.py # AuthMiddleware
│ └── path/
│ └── auth.py # Authentication endpoints
├── domain/
│ └── modules/
│ └── auth/
│ ├── entity.py # User entity
│ ├── session.py # Session management
│ └── permissions.py # Permission definitions
└── infra/
├── adapters/
│ └── auth/
│ ├── unified_adapter.py # Router for all adapters
│ ├── corporativo_adapter.py # Corporativo SSO
│ ├── sgf_adapter.py # SGF integration
│ ├── ged_adapter.py # GED integration
│ └── carrinho_adapter.py # Carrinho integration
├── cache/
│ └── redis_session.py # Redis session storage
└── services/
└── permission_mapper.py # Permission mapping# src/infra/adapters/auth/unified_adapter.py
from typing import Dict, Any
from jose import jwt, JWTError
from src.infra.adapters.auth.corporativo_adapter import CorporativoAuthAdapter
from src.infra.adapters.auth.sgf_adapter import SGFAuthAdapter
from src.infra.adapters.auth.ged_adapter import GEDAuthAdapter
from src.infra.adapters.auth.carrinho_adapter import CarrinhoAuthAdapter
from src.config.settings import app_settings
class UnifiedAuthAdapter:
"""Unified authentication adapter that routes tokens to appropriate SSO adapter.
Routes based on JWT issuer claim (iss).
"""
def __init__(
self,
corporativo_adapter: CorporativoAuthAdapter,
sgf_adapter: SGFAuthAdapter,
ged_adapter: GEDAuthAdapter,
carrinho_adapter: CarrinhoAuthAdapter,
):
self.adapters = {
"corporativo": corporativo_adapter,
"sgf": sgf_adapter,
"ged": ged_adapter,
"carrinho": carrinho_adapter,
}
# Map issuer URLs to adapter names
self.issuer_map = {
app_settings.CORPORATIVO_API_URL: "corporativo",
app_settings.SGF_API_URL: "sgf",
app_settings.GED_API_URL: "ged",
app_settings.CARRINHO_API_URL: "carrinho",
"gefin-backend": "corporativo", # Self-issued tokens
}
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate token and route to appropriate adapter.
Args:
token: JWT token string
Returns:
User data dictionary with permissions
Raises:
JWTError: If token is invalid or from unknown issuer
"""
# Decode without verification to check issuer
try:
unverified = jwt.get_unverified_claims(token)
issuer = unverified.get("iss")
except JWTError as e:
raise JWTError(f"Invalid JWT format: {e}")
# Map issuer to adapter
adapter_name = self.issuer_map.get(issuer)
if not adapter_name:
raise JWTError(f"Unknown token issuer: {issuer}")
# Check if adapter is enabled
enabled_systems = app_settings.ENABLED_AUTH_SYSTEMS
if adapter_name not in enabled_systems:
raise JWTError(f"Authentication system '{adapter_name}' is disabled")
# Route to appropriate adapter
adapter = self.adapters[adapter_name]
return await adapter.validate_token(token)
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session cookie (for legacy systems).
Routes to Corporativo adapter (primary session provider).
"""
return await self.adapters["corporativo"].validate_session(session_id)# src/infra/adapters/auth/base_adapter.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class IAuthAdapter(ABC):
"""Abstract base class for authentication adapters.
All SSO adapters must implement this interface.
"""
@abstractmethod
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token and return user data.
Args:
token: JWT token string
Returns:
User data with permissions
Raises:
JWTError: If token is invalid
"""
pass
@abstractmethod
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session ID and return user data.
Args:
session_id: Session identifier
Returns:
User data with permissions
Raises:
SessionError: If session is invalid
"""
pass
@abstractmethod
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Extract and map permissions from user data.
Args:
user_data: User data from SSO system
Returns:
List of permission strings in app format
"""
pass# src/infra/adapters/auth/corporativo_adapter.py
import httpx
from datetime import datetime, timedelta
from jose import jwt, JWTError
from src.infra.adapters.auth.base_adapter import IAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
class CorporativoAuthAdapter(IAuthAdapter):
"""Corporativo SSO authentication adapter.
Implements JWT RS256 validation with backwards verification.
"""
def __init__(
self,
public_key: str,
private_key: str,
api_url: str,
session_manager: RedisSessionManager,
):
self.public_key = public_key
self.private_key = private_key
self.api_url = api_url
self.session_manager = session_manager
self._validation_cache: Dict[str, tuple[Dict, datetime]] = {}
self._cache_ttl = 30 # 30 seconds
async def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token with backwards verification.
Steps:
1. Verify JWT signature with RSA public key
2. Check issuer and audience claims
3. Perform backwards verification with SSO system
4. Map permissions to app format
"""
try:
# Verify signature and decode token
payload = jwt.decode(
token,
self.public_key,
algorithms=["RS256"],
options={"verify_iss": False, "verify_aud": False}, # Manual validation
)
# Manual issuer validation
accepted_issuers = ["gefin-backend", self.api_url]
if payload.get("iss") not in accepted_issuers:
raise JWTError(f"Invalid issuer: {payload.get('iss')}")
# Manual audience validation
accepted_audiences = ["gefin-api", "gefin"]
aud = payload.get("aud")
if isinstance(aud, list):
if not any(a in accepted_audiences for a in aud):
raise JWTError(f"Invalid audience: {aud}")
elif aud not in accepted_audiences:
raise JWTError(f"Invalid audience: {aud}")
# Check expiration
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp) < datetime.now():
raise JWTError("Token has expired")
# Backwards verification (if not self-issued)
if payload.get("iss") != "gefin-backend":
await self._verify_with_corporativo(token, payload)
return payload
except JWTError as e:
raise JWTError(f"Token validation failed: {e}")
async def _verify_with_corporativo(
self,
token: str,
payload: Dict[str, Any]
) -> None:
"""Verify token validity with Corporativo SSO system.
Implements backwards verification with caching.
"""
# Check cache first
cache_key = payload.get("sub")
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
if datetime.now() - cached_at < timedelta(seconds=self._cache_ttl):
return # Valid in cache
# Call Corporativo /api/me endpoint
headers = {"Authorization": f"Bearer {token}"}
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.api_url}/api/me",
headers=headers,
)
response.raise_for_status()
# Cache validation result
self._validation_cache[cache_key] = (payload, datetime.now())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise JWTError("Token is not valid in Corporativo system")
# Network error - extend cache if exists
if cache_key in self._validation_cache:
cached_data, cached_at = self._validation_cache[cache_key]
# Extend cache to 5 minutes on network failure
if datetime.now() - cached_at < timedelta(minutes=5):
return
raise JWTError("Unable to verify token with Corporativo")
except httpx.RequestError:
# Network error - graceful degradation
if cache_key in self._validation_cache:
return
raise JWTError("Network error verifying token")
def get_permissions(self, user_data: Dict[str, Any]) -> list[str]:
"""Map Corporativo permissions to app format.
Example mapping:
"Ver anuidade" -> "gefin.boleto.read"
"Editar anuidade" -> "gefin.boleto.write"
"""
corporativo_permissions = user_data.get("permissions", [])
permission_map = {
"Ver anuidade": "gefin.boleto.read",
"Editar anuidade": "gefin.boleto.write",
"Ver parcelamento": "gefin.parcela.read",
"Editar parcelamento": "gefin.parcela.write",
"Ver publicações": "gefin.publicacao.read",
"Editar publicações": "gefin.publicacao.write",
# ... more mappings
}
mapped_permissions = []
for corp_perm in corporativo_permissions:
if corp_perm == "*": # Admin wildcard
return ["*"]
app_perm = permission_map.get(corp_perm)
if app_perm:
mapped_permissions.append(app_perm)
# Ensure at least read permission
if not any(p.endswith(".read") for p in mapped_permissions):
mapped_permissions.append("gefin.user.read")
return mapped_permissions
async def validate_session(self, session_id: str) -> Dict[str, Any]:
"""Validate session from Redis.
Falls back to Laravel session decryption if Redis unavailable.
"""
# Try Redis first
session_data = await self.session_manager.get_session(session_id)
if session_data:
return session_data
# Fall back to Laravel session decryption
return await self._decrypt_laravel_session(session_id)
async def _decrypt_laravel_session(self, session_cookie: str) -> Dict[str, Any]:
"""Decrypt Laravel AES-256-CBC session cookie.
Laravel session format:
- base64(iv:encrypted_payload:mac)
- Encrypted with APP_KEY from .env
"""
# Implementation omitted for brevity
# See Laravel session decryption pattern below
pass# src/infra/adapters/auth/laravel_session.py
import base64
import json
import hashlib
import hmac
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import unpad
import phpserialize
class LaravelSessionDecryptor:
"""Decrypt Laravel AES-256-CBC encrypted sessions.
Handles Laravel's session encryption format.
"""
def __init__(self, app_key: str):
"""Initialize with Laravel APP_KEY.
Args:
app_key: Laravel APP_KEY from .env (base64: prefix)
"""
# Remove 'base64:' prefix if present
if app_key.startswith("base64:"):
app_key = app_key[7:]
self.key = base64.b64decode(app_key)
def decrypt(self, encrypted_value: str) -> str:
"""Decrypt Laravel encrypted value.
Format: base64(json({"iv": "...", "value": "...", "mac": "..."}))
"""
# Decode base64
decoded = base64.b64decode(encrypted_value)
payload = json.loads(decoded)
# Verify MAC signature
if not self._valid_mac(payload):
raise ValueError("Invalid MAC signature")
# Decrypt
iv = base64.b64decode(payload["iv"])
encrypted = base64.b64decode(payload["value"])
cipher = AES.new(self.key, AES.MODE_CBC, iv)
decrypted = unpad(cipher.decrypt(encrypted), AES.block_size)
return decrypted.decode("utf-8")
def _valid_mac(self, payload: dict) -> bool:
"""Verify MAC signature."""
mac = payload.get("mac")
if not mac:
return False
# Calculate expected MAC
message = base64.b64encode(
json.dumps({"iv": payload["iv"], "value": payload["value"]}).encode()
)
expected_mac = hmac.new(
self.key,
message,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(mac, expected_mac)
def decrypt_session(self, session_cookie: str) -> dict:
"""Decrypt Laravel session cookie and extract user data.
Args:
session_cookie: Laravel session cookie value
Returns:
Dictionary with user_id and other session data
"""
# Decrypt session
decrypted = self.decrypt(session_cookie)
# Unserialize PHP session data
session_data = phpserialize.loads(decrypted.encode())
# Extract user ID from various Laravel guard patterns
user_id = None
# Pattern 1: login_web_{guard}_*
for key in session_data:
if isinstance(key, bytes):
key_str = key.decode()
if key_str.startswith("login_web_"):
user_id = session_data[key]
break
# Pattern 2: Direct user_id key
if not user_id and b"user_id" in session_data:
user_id = session_data[b"user_id"]
if not user_id:
raise ValueError("No user_id found in session")
return {
"user_id": user_id.decode() if isinstance(user_id, bytes) else user_id,
"session_data": session_data,
}# src/infra/cache/redis_session.py
import json
from datetime import timedelta
from redis.asyncio import Redis
class RedisSessionManager:
"""Manage user sessions in Redis.
Stores session data with TTL for automatic expiration.
"""
def __init__(self, redis_client: Redis, ttl_seconds: int = 28800):
"""Initialize session manager.
Args:
redis_client: Async Redis client
ttl_seconds: Session TTL (default 8 hours)
"""
self.redis = redis_client
self.ttl = ttl_seconds
async def create_session(self, user_data: dict) -> str:
"""Create new session and return session ID.
Args:
user_data: User data to store
Returns:
Session ID (UUID)
"""
import uuid
session_id = str(uuid.uuid4())
# Store in Redis
session_key = f"session:{session_id}"
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return session_id
async def get_session(self, session_id: str) -> dict | None:
"""Retrieve session data.
Args:
session_id: Session identifier
Returns:
User data dictionary or None if not found
"""
session_key = f"session:{session_id}"
data = await self.redis.get(session_key)
if not data:
return None
# Refresh TTL on access
await self.redis.expire(session_key, self.ttl)
return json.loads(data)
async def delete_session(self, session_id: str) -> bool:
"""Delete session.
Args:
session_id: Session identifier
Returns:
True if deleted, False if not found
"""
session_key = f"session:{session_id}"
result = await self.redis.delete(session_key)
return result > 0
async def update_session(self, session_id: str, user_data: dict) -> bool:
"""Update existing session data.
Args:
session_id: Session identifier
user_data: Updated user data
Returns:
True if updated, False if session not found
"""
session_key = f"session:{session_id}"
exists = await self.redis.exists(session_key)
if not exists:
return False
await self.redis.setex(
session_key,
self.ttl,
json.dumps(user_data),
)
return True# src/api/middlewares/auth.py
from fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
security = HTTPBearer()
class ProtectedResource:
"""FastAPI dependency for protected endpoints.
Usage:
@app.get("/protected", dependencies=[Depends(ProtectedResource.check)])
"""
def __init__(self, unified_adapter: UnifiedAuthAdapter):
self.unified_adapter = unified_adapter
async def check(
self,
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
"""Validate token and return user data.
Raises:
HTTPException: 401 if token invalid, 403 if insufficient permissions
"""
token = credentials.credentials
try:
user_data = await self.unified_adapter.validate_token(token)
return user_data
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
async def check_permissions(
self,
credentials: HTTPAuthorizationCredentials,
required_permissions: list[str],
) -> dict:
"""Validate token and check permissions.
Args:
credentials: Bearer token
required_permissions: List of required permissions
Returns:
User data if authorized
Raises:
HTTPException: 401 unauthorized, 403 forbidden
"""
user_data = await self.check(credentials)
user_permissions = user_data.get("permissions", [])
# Check for admin wildcard
if "*" in user_permissions:
return user_data
# Check required permissions
has_permission = any(
perm in user_permissions for perm in required_permissions
)
if not has_permission:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permissions: {required_permissions}",
)
return user_data# src/api/path/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel
from src.infra.adapters.auth.unified_adapter import UnifiedAuthAdapter
from src.infra.cache.redis_session import RedisSessionManager
router = APIRouter(prefix="/v1/auth", tags=["auth"])
class LoginRequest(BaseModel):
username: str
password: str
class SSOLoginRequest(BaseModel):
corporativo_session: str # Cookie from Corporativo
class TokenResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
):
"""Login with username/password (Corporativo).
Returns JWT access token.
"""
# Delegate to Corporativo adapter
result = await adapter.adapters["corporativo"].authenticate_credentials(
username=request.username,
password=request.password,
)
return TokenResponse(
access_token=result["access_token"],
expires_in=3600,
)
@router.post("/sso-login", response_model=TokenResponse)
async def sso_login(
request: SSOLoginRequest,
adapter: UnifiedAuthAdapter = Depends(),
session_manager: RedisSessionManager = Depends(),
):
"""SSO login using Corporativo session cookie.
Validates session, creates local session, returns JWT.
"""
# Validate Corporativo session
user_data = await adapter.validate_session(request.corporativo_session)
# Create local session
session_id = await session_manager.create_session(user_data)
# Generate JWT
token = adapter.adapters["corporativo"].generate_token(user_data)
return TokenResponse(
access_token=token,
expires_in=3600,
)
@router.get("/me")
async def get_current_user(
user_data: dict = Depends(ProtectedResource.check),
):
"""Get current authenticated user info."""
return {
"cpf": user_data.get("sub"),
"name": user_data.get("name"),
"email": user_data.get("email"),
"permissions": user_data.get("permissions"),
"systems": user_data.get("systems", []),
}
@router.post("/logout")
async def logout(
session_id: str,
session_manager: RedisSessionManager = Depends(),
):
"""Logout and invalidate session."""
await session_manager.delete_session(session_id)
return {"message": "Logged out successfully"}# src/config/settings.py
from pydantic_settings import BaseSettings
class AppSettings(BaseSettings):
"""Multi-system authentication settings."""
# Feature flags
ENABLE_MULTI_SYSTEM_AUTH: bool = True
ENABLED_AUTH_SYSTEMS: list[str] = ["corporativo", "sgf", "ged", "carrinho"]
# JWT configuration
JWT_ALGORITHM: str = "RS256"
JWT_PUBLIC_KEY_PATH: str = "./keys/jwt_public.pem"
JWT_PRIVATE_KEY_PATH: str = "./keys/jwt_private.pem"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60
REFRESH_TOKEN_EXPIRE_HOURS: int = 8
# SSO systems
CORPORATIVO_API_URL: str
CORPORATIVO_APP_KEY: str # Laravel APP_KEY for session decryption
SGF_API_URL: str
SGF_API_KEY: str
GED_API_URL: str
GED_API_KEY: str
CARRINHO_API_URL: str
CARRINHO_API_KEY: str
# Redis
REDIS_URL: str = "redis://localhost:6379/0"
SESSION_TTL_SECONDS: int = 28800 # 8 hours
# Backwards verification
ENABLE_BACKWARDS_VERIFICATION: bool = True
VERIFICATION_CACHE_TTL: int = 30 # seconds
VERIFICATION_TIMEOUT: int = 5 # seconds
app_settings = AppSettings()# Generate RSA key pair for JWT signing
openssl genrsa -out keys/jwt_private.pem 4096
openssl rsa -in keys/jwt_private.pem -pubout -out keys/jwt_public.pem
# Set proper permissions
chmod 600 keys/jwt_private.pem
chmod 644 keys/jwt_public.pem
# Add to .gitignore
echo "keys/jwt_private.pem" >> .gitignore# tests/infra/adapters/auth/test_corporativo_adapter.py
import pytest
from jose import jwt
from datetime import datetime, timedelta
@pytest.fixture
def valid_token(private_key):
"""Generate valid JWT token."""
payload = {
"sub": "12345678901",
"name": "Test User",
"email": "test@example.com",
"permissions": ["gefin.boleto.read"],
"iss": "gefin-backend",
"aud": "gefin-api",
"exp": datetime.utcnow() + timedelta(hours=1),
}
return jwt.encode(payload, private_key, algorithm="RS256")
@pytest.mark.asyncio
async def test_validate_token_success(corporativo_adapter, valid_token):
"""Test successful token validation."""
user_data = await corporativo_adapter.validate_token(valid_token)
assert user_data["sub"] == "12345678901"
assert "gefin.boleto.read" in user_data["permissions"]
@pytest.mark.asyncio
async def test_validate_token_invalid_signature(corporativo_adapter):
"""Test token with invalid signature."""
invalid_token = "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.invalid.signature"
with pytest.raises(JWTError):
await corporativo_adapter.validate_token(invalid_token)
@pytest.mark.asyncio
async def test_validate_token_expired(corporativo_adapter, private_key):
"""Test expired token."""
payload = {
"sub": "12345678901",
"exp": datetime.utcnow() - timedelta(hours=1), # Expired
"iss": "gefin-backend",
"aud": "gefin-api",
}
expired_token = jwt.encode(payload, private_key, algorithm="RS256")
with pytest.raises(JWTError, match="expired"):
await corporativo_adapter.validate_token(expired_token)# tests/integration/test_backwards_verification.py
@pytest.mark.asyncio
async def test_backwards_verification_valid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with valid token."""
# Mock Corporativo /api/me endpoint
mock_corporativo_api.get("/api/me").returns(
status=200,
json={"cpf": "12345678901", "name": "Test User"},
)
token = generate_corporativo_token()
user_data = await corporativo_adapter.validate_token(token)
assert user_data["sub"] == "12345678901"
@pytest.mark.asyncio
async def test_backwards_verification_invalid_token(
corporativo_adapter,
mock_corporativo_api,
):
"""Test backwards verification with invalid token."""
mock_corporativo_api.get("/api/me").returns(status=401)
token = generate_corporativo_token()
with pytest.raises(JWTError, match="not valid in Corporativo"):
await corporativo_adapter.validate_token(token)# Production key generation
openssl genrsa -out jwt_private.pem 4096
openssl rsa -in jwt_private.pem -pubout -out jwt_public.pem
# Secure storage (AWS Secrets Manager, HashiCorp Vault, etc.)
aws secretsmanager create-secret \
--name gefin/jwt-private-key \
--secret-string file://jwt_private.pem# Log authentication events
import structlog
logger = structlog.get_logger()
async def validate_token(self, token: str):
logger.info(
"token_validation_started",
issuer=self._get_issuer(token),
)
try:
user_data = await self._validate(token)
logger.info(
"token_validation_success",
user_id=user_data["sub"],
issuer=user_data["iss"],
)
return user_data
except JWTError as e:
logger.warning(
"token_validation_failed",
error=str(e),
)
raise