Skip to content

Authentication, Authorization and Identity Management

User authentication will be managed centrally for Rest API endpoints. Endpoints can check authentication information (authenticated or unauthenticated) to apply business rules (e.g. submitters can update study status if study is validated.)

General design and coding principles

  • Authentication can be completed with username/password or user API token. After authentication, a new JWT token will be created and shared with user.
  • Users use the generated JWT tokens to access restricted endpoints.
  • JWT tokens will be validated by authentication service. Neither other services nor endpoints will check JWT token.
  • Authenticated or unauthenticated user information will be stored in request context. Other services or endpoints can access it.
  • If a request has a JWT token and it is not valid (e.g. invalid format, expired, etc.), authentication service will return error message .
  • If the requested path contains resource id (MTBLS, REQ), authorization service checks permissions for the user. Other services or endpoints can access permission.

Authentication

Authentication service methods are shown below. Implementations should support at least JWT_TOKEN token and username/password authentications. Default implementatations are listed below:

  • standalone: Create and check JWT tokens. User information is fetched from database.
  • mtbls_ws2: It is a proxy service and use mtbls_ws2 endpoints for authentications.
class AuthenticationService(abc.ABC):
    @abc.abstractmethod
    async def authenticate_with_token(self, token_type: TokenType, token: str) -> str:
        raise NotImplementedError()

    @abc.abstractmethod
    async def authenticate_with_password(self, username: str, password: str) -> str:
        raise NotImplementedError()

    @abc.abstractmethod
    async def revoke_jwt_token(self, jwt: str) -> str:
        raise NotImplementedError()

    @abc.abstractmethod
    async def validate_token(self, token_type: TokenType, token: str) -> str:
        raise NotImplementedError()

Authentication service is used by a middleware service, and UnauthenticatedUser / AuthenticatedUser object is injected to request.

class AuthBackend(AuthenticationBackend):
    def __init__(
        self,
        authentication_service: AuthenticationService,
        user_read_repository: UserReadRepository,
    ):
        self.authentication_service = authentication_service
        self.user_read_repository = user_read_repository

    async def authenticate(self, conn):
        if "Authorization" not in conn.headers:
            return AuthCredentials({"unauthenticated"}), UnauthenticatedUser()

        auth = conn.headers["Authorization"]

        username = await self.validate_credential(auth)

        if not username:
            return AuthCredentials({"unauthenticated"}), UnauthenticatedUser()
        user: UserOutput = await self.user_read_repository.get_user_by_username(
            username
        )
        if not user:
            logger.error(
                "User role check failure. "
                "User %s details are not fetched by from database.",
                username,
            )
            raise AuthenticationError("User details are not fetched from database")
        scopes = {"authenticated"}
        if user.role == UserRole.SUBMITTER:
            scopes.add("submitter")
        elif user.role == UserRole.CURATOR:
            scopes.add("curator")
            scopes.add("submitter")
        elif user.role == UserRole.SYSTEM_ADMIN:
            scopes.add("admin")

        return AuthCredentials(scopes), AuthenticatedUser(user)

Each endpoint can check UnauthenticatedUser / AuthenticatedUser object. if only authorized users can access, endpoint should raise AuthenticationError exception.

logger = logging.getLogger(__name__)


async def check_read_permission(
    resource_id: Annotated[str, RESOURCE_ID_IN_PATH],
    jwt_token: Union[None, str] = Security(oauth2_scheme.oauth2_scheme),
    request: Request = None,
) -> StudyPermissionContext:
    get_request_tracker().resource_id_var.set(resource_id if resource_id else "-")

    if isinstance(request.user, AuthenticatedUser):
        if not jwt_token:
            raise AuthenticationError("Invalid jwt token.")
        context = request.user.permission_context
        if not context or not context.study or not context.permissions.read:
            logger.warning(
                "User %s is not granted to view resource %s",
                context.user.id_,
                resource_id,
            )
            raise AuthorizationError(
                f"User {context.user.id_} is not granted to view resource {resource_id}"
            )
        logger.debug(
            "User %s is granted to view resource %s",
            context.user.id_,
            resource_id,
        )
        return context
    elif isinstance(request.user, UnauthenticatedUser):
        context = request.user.permission_context
        if context and context.study and context.study.status == StudyStatus.PUBLIC:
            logger.debug(
                "Unauthenticated user is granted to view PUBLIC resource %s",
                resource_id,
            )
            return context

    logger.warning(
        "Unauthenticated user %s is not granted to view resource  %s",
        resource_id,
    )
    raise AuthorizationError(f"User has no authorization to read {resource_id}.")

Authorization

The authorization service determines the user's permissions for the requested resource (MTBLSxxx or REQxxx) and can be used by any endpoint.

class AuthorizationService(abc.ABC):
    @abc.abstractmethod
    async def get_permissions(
        self,
        username: Union[None, str],
        resource_id: str,
        sub_resource: Union[None, str] = None,
    ) -> StudyPermissionContext: ...

    @abc.abstractmethod
    async def get_user_resource_permission(
        self,
        user: Union[None, UserOutput],
        resource_id: str,
        sub_resource: Union[None, str] = None,
    ) -> StudyPermissionContext: ...

It returns StudyPermissionContext object that defines all possible user's permissions (read, write, delete, create).

class ResourcePermission(BaseModel):
    create: bool = False
    read: bool = False
    update: bool = False
    delete: bool = False


class PermissionContext(abc.ABC, BaseModel, Generic[T, R]):
    user: Union[None, T] = None
    is_owner: bool = False
    study: Union[None, R] = None
    permissions: ResourcePermission = ResourcePermission()
    parameters: dict[str, Any] = {}


class StudyPermissionContext(PermissionContext[UserOutput, StudyOutput]): ...

If there is a resource_id (or study_id) in the requested path (/../MTBLS1/..), AuthorizationMiddleware updates permission_context field of UnauthenticatedUser / AuthenticatedUser in request. Endpoints that include a resource_id in the request path can use it directly without requiring the authorization service. AuthorizationMiddleware also can check request paths and authorize request. If user has no permission to access the requested resource, AuthorizationMiddleware raises AuthorizationError. Path prefixes and allowed roles can be defined in configuration file.

Path Authorization Example

All endpoints starting with /submissions/ are restricted to users with the 'curator' or 'submitter' roles (some 'READ' exceptions if resource is public). Endpoints starting with /curation/ are accessible only to curators.

    authorized_endpoints:
    - prefix: "/submissions/"
      scopes:
      - curator
      - submitter
    - prefix: "/curation/"
      scopes:
      - curator

Identity Management

User identities (username, password hash, role etc.) are stored on database.