Skip to content

opensampl.load.table_factory

Database table factory for handling CRUD operations with conflict resolution.

TableFactory

Factory class for handling database table operations with conflict resolution.

Source code in opensampl/load/table_factory.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class TableFactory:
    """Factory class for handling database table operations with conflict resolution."""

    def __init__(self, name: str, session: Session):
        """
        Initialize Table Factory Object for db table matching given name.

        Args:
            name: Name of the database table.
            session: SQLAlchemy database session.

        """
        self.name = name
        self.session = session
        self.model = self.resolve_table()
        self.inspector = inspect(self.model)
        self.pk_columns = [col.key for col in self.inspector.primary_key]
        identifiable_const, unique_constraints = self.extract_unique_constraints()
        self.identifiable_constraint = identifiable_const
        self.unique_constraints = unique_constraints

    def resolve_table(self):
        """
        Retrieve the SQLAlchemy model class for the given table name.

        Returns:
            The corresponding SQLAlchemy model class.

        Raises:
            ValueError: If table name is not found in metadata.

        """
        for mapper in Base.registry.mappers:
            if mapper.class_.__tablename__ == self.name:
                return mapper.class_
        raise ValueError(f"Table {self.name} not found in database schema")

    def extract_unique_constraints(self):
        """
        Identify unique constraints that can be used to match existing entries.

        Returns:
            Tuple containing identifiable constraint and list of unique constraints.

        """
        id_const = self.model.identifiable_constraint()
        identifiable_constraint = []
        unique_constraints = []
        for constraint in self.inspector.tables[0].constraints:
            if isinstance(constraint, UniqueConstraint):
                cols = [col.key for col in constraint.columns]
                if id_const and str(constraint.name) == str(id_const):
                    identifiable_constraint = cols
                else:
                    unique_constraints.append(cols)
        return identifiable_constraint, unique_constraints

    def create_col_filter(self, data: dict[str, Any], cols: list[str]):
        """
        Create a SQLAlchemy filter expression for the given columns and data.

        Args:
            data: Dictionary containing the data values.
            cols: List of column names to create filter for.

        Returns:
            SQLAlchemy filter expression or None if columns are missing.

        """
        if cols != [] and all(col in data for col in cols):
            col_data_map = {col: data[col] for col in cols}
            logger.debug(f"column filter= {col_data_map}")
            return and_(*(getattr(self.model, k) == v for k, v in col_data_map.items()))  # ty: ignore[missing-argument]
        logger.debug(f"some or all columns from {cols} missing in data")
        return None

    def print_filter_debug(self, filter_expr: Optional[Union[BinaryExpression, BooleanClauseList]], label: str):
        """
        Print debug information for a filter expression.

        Args:
            filter_expr: The SQLAlchemy filter expression.
            label: Label for the debug output.

        """
        if filter_expr is not None:
            compiled = filter_expr.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
            logger.debug(f"{label}: {compiled}")

    def find_existing(self, data: dict[str, Any]) -> Optional[Base]:
        """
        Find an existing record that matches the provided data.

        Args:
            data: Dictionary containing the data to match against.

        Returns:
            Existing model instance or None if not found.

        Raises:
            ValueError: If no identifiable fields are provided.

        """
        primary_filter = self.create_col_filter(data=data, cols=self.pk_columns)
        self.print_filter_debug(primary_filter, "Primary filter")
        id_filter = self.create_col_filter(data=data, cols=self.identifiable_constraint)
        self.print_filter_debug(id_filter, "ID Constraint")

        unique_filters = [
            y for y in [self.create_col_filter(data=data, cols=x) for x in self.unique_constraints] if y is not None
        ]
        unique_filter = and_(*unique_filters) if unique_filters != [] else None  # ty: ignore[missing-argument]
        self.print_filter_debug(unique_filter, "Unique Constraint")

        if all(x is None for x in [primary_filter, id_filter, unique_filter]):
            raise ValueError(f"Did not provide identifiable fields for {self.name}")

        if primary_filter is not None:
            existing = self.session.query(self.model).filter(primary_filter).first()  # ty: ignore[missing-argument]
            if existing:
                logger.debug(f"Found {self.name} entry matching primary filters: {existing.to_dict()}")
                return existing

        if id_filter is not None:
            existing = self.session.query(self.model).filter(id_filter).first()  # ty: ignore[missing-argument]
            if existing:
                logger.debug(f"Found {self.name} entry matching identifiable filters: {existing.to_dict()}")
                return existing

        if unique_filter is not None:
            existing = self.session.query(self.model).filter(unique_filter).first()  # ty: ignore[missing-argument]
            if existing:
                logger.debug(f"Found {self.name} entry matching unique filters: {existing.to_dict()}")
                return existing

        return None

    def find_by_field(self, column_name: str, data: Any):
        """
        Get the entries where column = data.

        Args:
            column_name: Name of the column to filter by.
            data: Value to match against.

        Returns:
            List of model instances matching the criteria.

        Raises:
            ValueError: If column name does not exist in the table.

        """
        column = getattr(self.model, column_name, None)
        if column is None:
            raise ValueError(f"Column '{column_name}' does not exist in '{self.name}'.")

        stmt = select(self.model).where(column == data)
        result = self.session.execute(stmt)
        return result.scalars().all()

    def write(self, data: dict[str, Any], if_exists: conflict_actions = "update"):
        """
        Write data to the table that the factory refers to.

        Args:
            data: The data to write to the table.
            if_exists: How to handle conflicts with existing entries. One of:
                - 'update': Only update fields that are provided and non-default (default)
                - 'error': Raise an error if entry exists
                - 'replace': Replace all non-primary-key fields with new values
                - 'ignore': Skip if entry exists

        Returns:
            The created or updated model instance.

        Raises:
            ValueError: If entry exists and if_exists is 'error'.

        """
        existing = self.find_existing(data)

        if not existing:
            new_entry = self.model(**data)
            self.session.add(new_entry)
            logger.debug(f"New entry created in {self.name}")
            self.session.flush()
            return new_entry

        if if_exists == "error":
            logger.error(f"Existing entry: {existing.to_dict()} \nProvided data: {data}")
            raise ValueError(f"Data matched existing entry in {self.name}")

        if if_exists == "ignore":
            logger.debug("Existing entry found, ignoring data")
            return existing

        new_entry = self.model(**data)
        new_entry.resolve_references(session=self.session)

        for col in self.inspector.columns.values():
            if col.key in self.pk_columns:
                continue
            current_value = getattr(existing, col.key)
            new_value = getattr(new_entry, col.key)
            if if_exists == "replace" and (col.key in data or new_value is not None):
                logger.debug(f"Replacing {col.key}: {current_value} -> {new_value}")
                setattr(existing, col.key, new_value)

            elif if_exists == "update" and current_value is None and new_value is not None:
                logger.debug(f"Updating {col.key} from None to {new_value}")
                setattr(existing, col.key, new_value)

        self.session.flush()
        return existing

__init__(name, session)

Initialize Table Factory Object for db table matching given name.

Parameters:

Name Type Description Default
name str

Name of the database table.

required
session Session

SQLAlchemy database session.

required
Source code in opensampl/load/table_factory.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __init__(self, name: str, session: Session):
    """
    Initialize Table Factory Object for db table matching given name.

    Args:
        name: Name of the database table.
        session: SQLAlchemy database session.

    """
    self.name = name
    self.session = session
    self.model = self.resolve_table()
    self.inspector = inspect(self.model)
    self.pk_columns = [col.key for col in self.inspector.primary_key]
    identifiable_const, unique_constraints = self.extract_unique_constraints()
    self.identifiable_constraint = identifiable_const
    self.unique_constraints = unique_constraints

create_col_filter(data, cols)

Create a SQLAlchemy filter expression for the given columns and data.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing the data values.

required
cols list[str]

List of column names to create filter for.

required

Returns:

Type Description

SQLAlchemy filter expression or None if columns are missing.

Source code in opensampl/load/table_factory.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def create_col_filter(self, data: dict[str, Any], cols: list[str]):
    """
    Create a SQLAlchemy filter expression for the given columns and data.

    Args:
        data: Dictionary containing the data values.
        cols: List of column names to create filter for.

    Returns:
        SQLAlchemy filter expression or None if columns are missing.

    """
    if cols != [] and all(col in data for col in cols):
        col_data_map = {col: data[col] for col in cols}
        logger.debug(f"column filter= {col_data_map}")
        return and_(*(getattr(self.model, k) == v for k, v in col_data_map.items()))  # ty: ignore[missing-argument]
    logger.debug(f"some or all columns from {cols} missing in data")
    return None

extract_unique_constraints()

Identify unique constraints that can be used to match existing entries.

Returns:

Type Description

Tuple containing identifiable constraint and list of unique constraints.

Source code in opensampl/load/table_factory.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def extract_unique_constraints(self):
    """
    Identify unique constraints that can be used to match existing entries.

    Returns:
        Tuple containing identifiable constraint and list of unique constraints.

    """
    id_const = self.model.identifiable_constraint()
    identifiable_constraint = []
    unique_constraints = []
    for constraint in self.inspector.tables[0].constraints:
        if isinstance(constraint, UniqueConstraint):
            cols = [col.key for col in constraint.columns]
            if id_const and str(constraint.name) == str(id_const):
                identifiable_constraint = cols
            else:
                unique_constraints.append(cols)
    return identifiable_constraint, unique_constraints

find_by_field(column_name, data)

Get the entries where column = data.

Parameters:

Name Type Description Default
column_name str

Name of the column to filter by.

required
data Any

Value to match against.

required

Returns:

Type Description

List of model instances matching the criteria.

Raises:

Type Description
ValueError

If column name does not exist in the table.

Source code in opensampl/load/table_factory.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def find_by_field(self, column_name: str, data: Any):
    """
    Get the entries where column = data.

    Args:
        column_name: Name of the column to filter by.
        data: Value to match against.

    Returns:
        List of model instances matching the criteria.

    Raises:
        ValueError: If column name does not exist in the table.

    """
    column = getattr(self.model, column_name, None)
    if column is None:
        raise ValueError(f"Column '{column_name}' does not exist in '{self.name}'.")

    stmt = select(self.model).where(column == data)
    result = self.session.execute(stmt)
    return result.scalars().all()

find_existing(data)

Find an existing record that matches the provided data.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary containing the data to match against.

required

Returns:

Type Description
Optional[Base]

Existing model instance or None if not found.

Raises:

Type Description
ValueError

If no identifiable fields are provided.

Source code in opensampl/load/table_factory.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def find_existing(self, data: dict[str, Any]) -> Optional[Base]:
    """
    Find an existing record that matches the provided data.

    Args:
        data: Dictionary containing the data to match against.

    Returns:
        Existing model instance or None if not found.

    Raises:
        ValueError: If no identifiable fields are provided.

    """
    primary_filter = self.create_col_filter(data=data, cols=self.pk_columns)
    self.print_filter_debug(primary_filter, "Primary filter")
    id_filter = self.create_col_filter(data=data, cols=self.identifiable_constraint)
    self.print_filter_debug(id_filter, "ID Constraint")

    unique_filters = [
        y for y in [self.create_col_filter(data=data, cols=x) for x in self.unique_constraints] if y is not None
    ]
    unique_filter = and_(*unique_filters) if unique_filters != [] else None  # ty: ignore[missing-argument]
    self.print_filter_debug(unique_filter, "Unique Constraint")

    if all(x is None for x in [primary_filter, id_filter, unique_filter]):
        raise ValueError(f"Did not provide identifiable fields for {self.name}")

    if primary_filter is not None:
        existing = self.session.query(self.model).filter(primary_filter).first()  # ty: ignore[missing-argument]
        if existing:
            logger.debug(f"Found {self.name} entry matching primary filters: {existing.to_dict()}")
            return existing

    if id_filter is not None:
        existing = self.session.query(self.model).filter(id_filter).first()  # ty: ignore[missing-argument]
        if existing:
            logger.debug(f"Found {self.name} entry matching identifiable filters: {existing.to_dict()}")
            return existing

    if unique_filter is not None:
        existing = self.session.query(self.model).filter(unique_filter).first()  # ty: ignore[missing-argument]
        if existing:
            logger.debug(f"Found {self.name} entry matching unique filters: {existing.to_dict()}")
            return existing

    return None

print_filter_debug(filter_expr, label)

Print debug information for a filter expression.

Parameters:

Name Type Description Default
filter_expr Optional[Union[BinaryExpression, BooleanClauseList]]

The SQLAlchemy filter expression.

required
label str

Label for the debug output.

required
Source code in opensampl/load/table_factory.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def print_filter_debug(self, filter_expr: Optional[Union[BinaryExpression, BooleanClauseList]], label: str):
    """
    Print debug information for a filter expression.

    Args:
        filter_expr: The SQLAlchemy filter expression.
        label: Label for the debug output.

    """
    if filter_expr is not None:
        compiled = filter_expr.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
        logger.debug(f"{label}: {compiled}")

resolve_table()

Retrieve the SQLAlchemy model class for the given table name.

Returns:

Type Description

The corresponding SQLAlchemy model class.

Raises:

Type Description
ValueError

If table name is not found in metadata.

Source code in opensampl/load/table_factory.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def resolve_table(self):
    """
    Retrieve the SQLAlchemy model class for the given table name.

    Returns:
        The corresponding SQLAlchemy model class.

    Raises:
        ValueError: If table name is not found in metadata.

    """
    for mapper in Base.registry.mappers:
        if mapper.class_.__tablename__ == self.name:
            return mapper.class_
    raise ValueError(f"Table {self.name} not found in database schema")

write(data, if_exists='update')

Write data to the table that the factory refers to.

Parameters:

Name Type Description Default
data dict[str, Any]

The data to write to the table.

required
if_exists conflict_actions

How to handle conflicts with existing entries. One of: - 'update': Only update fields that are provided and non-default (default) - 'error': Raise an error if entry exists - 'replace': Replace all non-primary-key fields with new values - 'ignore': Skip if entry exists

'update'

Returns:

Type Description

The created or updated model instance.

Raises:

Type Description
ValueError

If entry exists and if_exists is 'error'.

Source code in opensampl/load/table_factory.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def write(self, data: dict[str, Any], if_exists: conflict_actions = "update"):
    """
    Write data to the table that the factory refers to.

    Args:
        data: The data to write to the table.
        if_exists: How to handle conflicts with existing entries. One of:
            - 'update': Only update fields that are provided and non-default (default)
            - 'error': Raise an error if entry exists
            - 'replace': Replace all non-primary-key fields with new values
            - 'ignore': Skip if entry exists

    Returns:
        The created or updated model instance.

    Raises:
        ValueError: If entry exists and if_exists is 'error'.

    """
    existing = self.find_existing(data)

    if not existing:
        new_entry = self.model(**data)
        self.session.add(new_entry)
        logger.debug(f"New entry created in {self.name}")
        self.session.flush()
        return new_entry

    if if_exists == "error":
        logger.error(f"Existing entry: {existing.to_dict()} \nProvided data: {data}")
        raise ValueError(f"Data matched existing entry in {self.name}")

    if if_exists == "ignore":
        logger.debug("Existing entry found, ignoring data")
        return existing

    new_entry = self.model(**data)
    new_entry.resolve_references(session=self.session)

    for col in self.inspector.columns.values():
        if col.key in self.pk_columns:
            continue
        current_value = getattr(existing, col.key)
        new_value = getattr(new_entry, col.key)
        if if_exists == "replace" and (col.key in data or new_value is not None):
            logger.debug(f"Replacing {col.key}: {current_value} -> {new_value}")
            setattr(existing, col.key, new_value)

        elif if_exists == "update" and current_value is None and new_value is not None:
            logger.debug(f"Updating {col.key} from None to {new_value}")
            setattr(existing, col.key, new_value)

    self.session.flush()
    return existing