Authentication, Authorization and Identity Management¶
User authentication will be managed centrally for Rest API endpoints. Endpoints can check authentication information (authenticated or unauthenticated) to apply business rules (e.g. submitters can update study status if study is validated.)
General design and coding principles¶
- Authentication can be completed with username/password or user API token. After authentication, a new JWT token will be created and shared with user.
- Users use the generated JWT tokens to access restricted endpoints.
- JWT tokens will be validated by authentication service. Neither other services nor endpoints will check JWT token.
- Authenticated or unauthenticated user information will be stored in request context. Other services or endpoints can access it.
- If a request has a JWT token and it is not valid (e.g. invalid format, expired, etc.), authentication service will return error message .
- If the requested path contains resource id (MTBLS, REQ), authorization service checks permissions for the user. Other services or endpoints can access permission.
Authentication¶
Authentication service methods are shown below. Implementations should support at least JWT_TOKEN token and username/password authentications. Default implementatations are listed below:
standalone: Create and check JWT tokens. User information is fetched from database.mtbls_ws2: It is a proxy service and use mtbls_ws2 endpoints for authentications.
class AuthenticationService(abc.ABC):
@abc.abstractmethod
async def authenticate_with_token(self, token_type: TokenType, token: str) -> str:
raise NotImplementedError()
@abc.abstractmethod
async def authenticate_with_password(self, username: str, password: str) -> str:
raise NotImplementedError()
@abc.abstractmethod
async def revoke_jwt_token(self, jwt: str) -> str:
raise NotImplementedError()
@abc.abstractmethod
async def validate_token(self, token_type: TokenType, token: str) -> str:
raise NotImplementedError()
Authentication service is used by a middleware service, and UnauthenticatedUser / AuthenticatedUser object is injected to request.
class AuthBackend(AuthenticationBackend):
def __init__(
self,
authentication_service: AuthenticationService,
user_read_repository: UserReadRepository,
):
self.authentication_service = authentication_service
self.user_read_repository = user_read_repository
async def authenticate(self, conn):
if "Authorization" not in conn.headers:
return AuthCredentials({"unauthenticated"}), UnauthenticatedUser()
auth = conn.headers["Authorization"]
username = await self.validate_credential(auth)
if not username:
return AuthCredentials({"unauthenticated"}), UnauthenticatedUser()
user: UserOutput = await self.user_read_repository.get_user_by_username(
username
)
if not user:
logger.error(
"User role check failure. "
"User %s details are not fetched by from database.",
username,
)
raise AuthenticationError("User details are not fetched from database")
scopes = {"authenticated"}
if user.role == UserRole.SUBMITTER:
scopes.add("submitter")
elif user.role == UserRole.CURATOR:
scopes.add("curator")
scopes.add("submitter")
elif user.role == UserRole.SYSTEM_ADMIN:
scopes.add("admin")
return AuthCredentials(scopes), AuthenticatedUser(user)
Each endpoint can check UnauthenticatedUser / AuthenticatedUser object. if only authorized users can access, endpoint should raise AuthenticationError exception.
logger = logging.getLogger(__name__)
async def check_read_permission(
resource_id: Annotated[str, RESOURCE_ID_IN_PATH],
jwt_token: Union[None, str] = Security(oauth2_scheme.oauth2_scheme),
request: Request = None,
) -> StudyPermissionContext:
get_request_tracker().resource_id_var.set(resource_id if resource_id else "-")
if isinstance(request.user, AuthenticatedUser):
if not jwt_token:
raise AuthenticationError("Invalid jwt token.")
context = request.user.permission_context
if not context or not context.study or not context.permissions.read:
logger.warning(
"User %s is not granted to view resource %s",
context.user.id_,
resource_id,
)
raise AuthorizationError(
f"User {context.user.id_} is not granted to view resource {resource_id}"
)
logger.debug(
"User %s is granted to view resource %s",
context.user.id_,
resource_id,
)
return context
elif isinstance(request.user, UnauthenticatedUser):
context = request.user.permission_context
if context and context.study and context.study.status == StudyStatus.PUBLIC:
logger.debug(
"Unauthenticated user is granted to view PUBLIC resource %s",
resource_id,
)
return context
logger.warning(
"Unauthenticated user %s is not granted to view resource %s",
resource_id,
)
raise AuthorizationError(f"User has no authorization to read {resource_id}.")
Authorization¶
The authorization service determines the user's permissions for the requested resource (MTBLSxxx or REQxxx) and can be used by any endpoint.
class AuthorizationService(abc.ABC):
@abc.abstractmethod
async def get_permissions(
self,
username: Union[None, str],
resource_id: str,
sub_resource: Union[None, str] = None,
) -> StudyPermissionContext: ...
@abc.abstractmethod
async def get_user_resource_permission(
self,
user: Union[None, UserOutput],
resource_id: str,
sub_resource: Union[None, str] = None,
) -> StudyPermissionContext: ...
It returns StudyPermissionContext object that defines all possible user's permissions (read, write, delete, create).
class ResourcePermission(BaseModel):
create: bool = False
read: bool = False
update: bool = False
delete: bool = False
class PermissionContext(abc.ABC, BaseModel, Generic[T, R]):
user: Union[None, T] = None
is_owner: bool = False
study: Union[None, R] = None
permissions: ResourcePermission = ResourcePermission()
parameters: dict[str, Any] = {}
class StudyPermissionContext(PermissionContext[UserOutput, StudyOutput]): ...
If there is a resource_id (or study_id) in the requested path (/../MTBLS1/..), AuthorizationMiddleware updates permission_context field of UnauthenticatedUser / AuthenticatedUser in request. Endpoints that include a resource_id in the request path can use it directly without requiring the authorization service. AuthorizationMiddleware also can check request paths and authorize request. If user has no permission to access the requested resource, AuthorizationMiddleware raises AuthorizationError. Path prefixes and allowed roles can be defined in configuration file.
Path Authorization Example
All endpoints starting with /submissions/ are restricted to users with the 'curator' or 'submitter' roles (some 'READ' exceptions if resource is public). Endpoints starting with /curation/ are accessible only to curators.
Identity Management¶
User identities (username, password hash, role etc.) are stored on database.