Skip to content

opensampl.db.access_orm

ORM for managing access to openSAMPL database and endpoints

APIAccessKey

Bases: Base

Table for recording and managing API access keys.

Source code in opensampl/db/access_orm.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class APIAccessKey(Base):
    """Table for recording and managing API access keys."""

    __tablename__ = "api_access_keys"

    id = Column(Integer, primary_key=True, index=True)
    key = Column(String(64), unique=True, nullable=False, index=True)
    created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
    expires_at = Column(DateTime, nullable=True)

    def generate_key(self):
        """
        Generate the API access key.

        Returns:
            The generated access key string.

        """
        self.key = secrets.token_urlsafe(48)
        return self.key

    def is_expired(self):
        """
        Check if API access key is expired.

        Returns:
            True if the key is expired, False otherwise.

        """
        return self.expires_at is not None and datetime.now(tz=timezone.utc) > self.expires_at

generate_key()

Generate the API access key.

Returns:

Type Description

The generated access key string.

Source code in opensampl/db/access_orm.py
26
27
28
29
30
31
32
33
34
35
def generate_key(self):
    """
    Generate the API access key.

    Returns:
        The generated access key string.

    """
    self.key = secrets.token_urlsafe(48)
    return self.key

is_expired()

Check if API access key is expired.

Returns:

Type Description

True if the key is expired, False otherwise.

Source code in opensampl/db/access_orm.py
37
38
39
40
41
42
43
44
45
def is_expired(self):
    """
    Check if API access key is expired.

    Returns:
        True if the key is expired, False otherwise.

    """
    return self.expires_at is not None and datetime.now(tz=timezone.utc) > self.expires_at

Roles

Bases: Base

Table for recording and managing roles.

Source code in opensampl/db/access_orm.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class Roles(Base):
    """Table for recording and managing roles."""

    __tablename__ = "roles"
    role_id = Column(Text, primary_key=True, default=str(uuid.uuid4()))
    name = Column(Text)
    view_id = Column(Text, ForeignKey("views.view_id"))

    @staticmethod
    def get_role_by_name(session: Session, name: str) -> Optional[type["Roles"]]:
        """
        Get role by name.

        Args:
            session: Database session.
            name: Name of the role to find.

        Returns:
            Roles object if found, None otherwise.

        """
        try:
            return session.query(Roles).filter_by(name=name).one()
        except NoResultFound:
            print(f"Role with name {name} not found")  # noqa: T201
            return None

get_role_by_name(session, name) staticmethod

Get role by name.

Parameters:

Name Type Description Default
session Session

Database session.

required
name str

Name of the role to find.

required

Returns:

Type Description
Optional[type[Roles]]

Roles object if found, None otherwise.

Source code in opensampl/db/access_orm.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@staticmethod
def get_role_by_name(session: Session, name: str) -> Optional[type["Roles"]]:
    """
    Get role by name.

    Args:
        session: Database session.
        name: Name of the role to find.

    Returns:
        Roles object if found, None otherwise.

    """
    try:
        return session.query(Roles).filter_by(name=name).one()
    except NoResultFound:
        print(f"Role with name {name} not found")  # noqa: T201
        return None

UserRole

Bases: Base

Table for recording and managing user roles.

Source code in opensampl/db/access_orm.py
130
131
132
133
134
135
class UserRole(Base):
    """Table for recording and managing user roles."""

    __tablename__ = "user_role"
    user_id = Column(Text, ForeignKey("users.user_id"), primary_key=True)
    role_id = Column(Text, ForeignKey("roles.role_id"), primary_key=True)

Users

Bases: Base

Table for recording and managing users.

Source code in opensampl/db/access_orm.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class Users(Base):
    """Table for recording and managing users."""

    __tablename__ = "users"
    user_id = Column(Text, primary_key=True, default=str(uuid.uuid4()))
    email = Column(Text)

    @staticmethod
    def get_user_by_email(session: Session, email: str) -> Optional["Users"]:
        """
        Get user by email.

        Args:
            session: Database session.
            email: Email address of the user to find.

        Returns:
            Users object if found, None otherwise.

        """
        try:
            return session.query(Users).filter_by(email=email).one()
        except NoResultFound:
            print(f"User with email {email} not found")  # noqa: T201
            return None

get_user_by_email(session, email) staticmethod

Get user by email.

Parameters:

Name Type Description Default
session Session

Database session.

required
email str

Email address of the user to find.

required

Returns:

Type Description
Optional[Users]

Users object if found, None otherwise.

Source code in opensampl/db/access_orm.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@staticmethod
def get_user_by_email(session: Session, email: str) -> Optional["Users"]:
    """
    Get user by email.

    Args:
        session: Database session.
        email: Email address of the user to find.

    Returns:
        Users object if found, None otherwise.

    """
    try:
        return session.query(Users).filter_by(email=email).one()
    except NoResultFound:
        print(f"User with email {email} not found")  # noqa: T201
        return None

Views

Bases: Base

Table for recording and managing views.

Source code in opensampl/db/access_orm.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class Views(Base):
    """Table for recording and managing views."""

    __tablename__ = "views"
    view_id = Column(Text, primary_key=True, default=str(uuid.uuid4()))
    name = Column(Text)

    @staticmethod
    def get_view_by_name(session: Session, name: str) -> Optional[type["Views"]]:
        """
        Get view by name.

        Args:
            session: Database session.
            name: Name of the view to find.

        Returns:
            Views object if found, None otherwise.

        """
        try:
            return session.query(Views).filter_by(name=name).one()
        except NoResultFound:
            print(f"View with name {name} not found")  # noqa: T201
            return None

get_view_by_name(session, name) staticmethod

Get view by name.

Parameters:

Name Type Description Default
session Session

Database session.

required
name str

Name of the view to find.

required

Returns:

Type Description
Optional[type[Views]]

Views object if found, None otherwise.

Source code in opensampl/db/access_orm.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@staticmethod
def get_view_by_name(session: Session, name: str) -> Optional[type["Views"]]:
    """
    Get view by name.

    Args:
        session: Database session.
        name: Name of the view to find.

    Returns:
        Views object if found, None otherwise.

    """
    try:
        return session.query(Views).filter_by(name=name).one()
    except NoResultFound:
        print(f"View with name {name} not found")  # noqa: T201
        return None

add_user_role(emails, role_name, session)

Add user role to the database.

Parameters:

Name Type Description Default
emails Union[str, list[str]]

Email address(es) of user(s) to assign the role to.

required
role_name str

Name of the role to assign.

required
session Session

Database session.

required
Source code in opensampl/db/access_orm.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def add_user_role(emails: Union[str, list[str]], role_name: str, session: Session):
    """
    Add user role to the database.

    Args:
        emails: Email address(es) of user(s) to assign the role to.
        role_name: Name of the role to assign.
        session: Database session.

    """
    if isinstance(emails, str):
        emails = [emails]

    role = Roles.get_role_by_name(name=role_name)
    if role is None:
        print(f"Role with name {role_name} not found")  # noqa: T201
        return

    for email in emails:
        user = Users.get_user_by_email(email=email)
        if user is None:
            # Create a new user if not found
            user = Users(email=email)
            session.add(user)
            session.flush()  # Flush to get the generated user_id
            print(f"New user created with email {email}")  # noqa: T201

        # Check if user already has the specified role
        if any(ur.role_id == role.role_id for ur in user.user_role):
            print(f"User with email {email} already has role {role_name}")  # noqa: T201
            continue

        # Create a new entry in user_role table
        user_role = UserRole(user_id=user.user_id, role_id=role.role_id)
        session.add(user_role)
        print(f"User with email {email} assigned role {role_name}")  # noqa: T201
        session.commit()