Skip to content

opensampl.load_data

Main functionality for loading data into the database

create_new_tables(*, _config, create_schema=True, session=None)

Use the ORM definition to create all tables, optionally creating the schema as well

Source code in opensampl/load_data.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@route("create_new_tables", method="GET")
def create_new_tables(*, _config: BaseConfig, create_schema: bool = True, session: Optional[Session] = None):
    """Use the ORM definition to create all tables, optionally creating the schema as well"""
    if _config.ROUTE_TO_BACKEND:
        return {"create_schema": create_schema}

    if not isinstance(session, Session):
        raise TypeError("Session must be a SQLAlchemy session")

    try:
        if create_schema:
            session.execute(text(f"CREATE SCHEMA IF NOT EXISTS {Base.metadata.schema}"))
            session.commit()
        Base.metadata.create_all(session.bind)
    except Exception as e:
        session.rollback()
        logger.error(f"Error writing to table: {e}")
        raise

load_probe_metadata(*, vendor, probe_key, data, _config, session=None)

Write object to table

Source code in opensampl/load_data.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
@route("load_probe_metadata")
def load_probe_metadata(
    *,
    vendor: VendorType,
    probe_key: ProbeKey,
    data: dict[str, Any],
    _config: BaseConfig,
    session: Optional[Session] = None,
):
    """Write object to table"""
    if _config.ROUTE_TO_BACKEND:
        return {
            "vendor": vendor.model_dump(),
            "probe_key": probe_key.model_dump(),
            "data": data,
        }

    if not isinstance(session, Session):
        raise TypeError("Session must be a SQLAlchemy session")

    try:
        pm_factory = TableFactory(name="probe_metadata", session=session)

        pm_cols = {col.name for col in pm_factory.inspector.columns}
        probe_info = {k: data.pop(k) for k in list(data.keys()) if k in pm_cols}
        probe_info.update({"probe_id": probe_key.probe_id, "ip_address": probe_key.ip_address, "vendor": vendor.name})
        probe = pm_factory.write(data=probe_info, if_exists="update")

        data["probe_uuid"] = probe.uuid

        write_to_table(table=vendor.metadata_table, data=data, session=session, if_exists="update")

        session.commit()
    except Exception as e:
        session.rollback()
        logger.exception(f"Error writing to table: {e}")
        raise

load_time_data(probe_key, metric_type, reference_type, data, _config, compound_key=None, strict=True, session=None)

Write time data to probe_data table

Parameters:

Name Type Description Default
probe_key ProbeKey

ProbeKey object

required
metric_type MetricType

MetricType object

required
reference_type ReferenceType

ReferenceType object

required
data DataFrame

pandas dataframe with time and value columns

required
_config BaseConfig

BaseSettings object, automatically filled by route wrapper

required
compound_key Optional[dict[str, Any]]

UUID for the reference if reference type is compound

None
strict bool

If true, raises error if any of the data parts (reference/metric/etc) not found. If false, creates new probe. Default: True

True
session Optional[Session]

SQLAlchemy session

None
Source code in opensampl/load_data.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@route("load_time_data", send_file=True)
def load_time_data(
    probe_key: ProbeKey,
    metric_type: MetricType,
    reference_type: ReferenceType,
    data: pd.DataFrame,
    _config: BaseConfig,
    compound_key: Optional[dict[str, Any]] = None,
    strict: bool = True,
    session: Optional[Session] = None,
):
    """
    Write time data to probe_data table

    Args:
        probe_key: ProbeKey object
        metric_type: MetricType object
        reference_type: ReferenceType object
        data: pandas dataframe with time and value columns
        _config: BaseSettings object, automatically filled by route wrapper
        compound_key: UUID for the reference if reference type is compound
        strict: If true, raises error if any of the data parts (reference/metric/etc) not found.
            If false, creates new probe. Default: True
        session: SQLAlchemy session

    """
    if _config.ROUTE_TO_BACKEND:
        csv_data = data.to_csv(index=False).encode("utf-8")
        return {
            "data": {
                "probe_key_str": json.dumps(probe_key.model_dump()),
                "metric_type_str": json.dumps(metric_type.model_dump()),
                "reference_type_str": json.dumps(reference_type.model_dump()),
                "compound_key_str": json.dumps(compound_key),
            },
            "files": {"file": ("time_data.csv", csv_data, "text/csv")},
        }

    if not isinstance(session, Session):
        raise TypeError("Session must be a SQLAlchemy session")

    try:
        from opensampl.load.data import DataFactory

        data_definition = DataFactory(
            probe_key=probe_key,
            metric_type=metric_type,
            reference_type=reference_type,
            compound_key=compound_key,
            strict=strict,
            session=session,
        )

        if any(x is None for x in [data_definition.probe, data_definition.metric, data_definition.reference]):
            raise RuntimeError(f"Not all required definition fields filled: {data_definition.dump_factory()}")  # noqa: TRY301

        df = data[["time", "value"]].copy()  # Only keep required columns.
        df["probe_uuid"] = data_definition.probe.uuid  # ty: ignore[possibly-unbound-attribute]
        df["reference_uuid"] = data_definition.reference.uuid  # ty: ignore[possibly-unbound-attribute]
        df["metric_type_uuid"] = data_definition.metric.uuid  # ty: ignore[possibly-unbound-attribute]
        logger.debug(df.head())
        # Ensure correct dtypes
        df["time"] = pd.to_datetime(df["time"], utc=True, errors="raise")
        df["value"] = df["value"].apply(json.dumps)

        records = df.to_dict(orient="records")
        insert_stmt = text(f"""
        INSERT INTO {ProbeData.__table__.schema}.{ProbeData.__tablename__}
        (time, probe_uuid, reference_uuid, metric_type_uuid, value)
        VALUES (:time, :probe_uuid, :reference_uuid, :metric_type_uuid, :value)
        ON CONFLICT (time, probe_uuid, reference_uuid, metric_type_uuid)
        DO NOTHING
        """)  # noqa: S608

        try:
            result = session.execute(insert_stmt, records)
            session.commit()
            total_rows = len(records)
            inserted = result.rowcount  # ty: ignore[unresolved-attribute]
            excluded = total_rows - inserted
            logger.warning(f"Inserted {inserted}/{total_rows} rows; {excluded}/{total_rows} rejected due to conflicts")

        except Exception as e:
            # In case of an error, roll back the session
            session.rollback()
            logger.error(f"Error inserting rows: {e}")
            raise

    except Exception as e:
        logger.exception(f"Error writing time data: {e}")
        session.rollback()
        raise

write_to_table(table, data, _config, if_exists='update', session=None)

Write object to table with configurable behavior for handling conflicts.


table: Name of the table to write to
data: Dictionary of column names and values to write
_config: BaseSettings object, automatically filled by route wrapper
if_exists: How to handle conflicts with existing entries. One of:
    - 'update': Only update fields that are provided and non-default (default)
    - 'error': Raise an error if entry exists
    - 'replace': Replace all non-primary-key fields with new values
    - 'ignore': Skip if entry exists
session: Optional SQLAlchemy session

ValueError: If table not found or invalid on_conflict value
SQLAlchemyError: For database errors
Source code in opensampl/load_data.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@route("write_to_table")
def write_to_table(
    table: str,
    data: dict[str, Any],
    _config: BaseConfig,
    if_exists: conflict_actions = "update",
    session: Optional[Session] = None,
):
    """
    Write object to table with configurable behavior for handling conflicts.

    Args:
    ----
        table: Name of the table to write to
        data: Dictionary of column names and values to write
        _config: BaseSettings object, automatically filled by route wrapper
        if_exists: How to handle conflicts with existing entries. One of:
            - 'update': Only update fields that are provided and non-default (default)
            - 'error': Raise an error if entry exists
            - 'replace': Replace all non-primary-key fields with new values
            - 'ignore': Skip if entry exists
        session: Optional SQLAlchemy session

    Raises:
    ------
        ValueError: If table not found or invalid on_conflict value
        SQLAlchemyError: For database errors

    """
    if if_exists not in ["error", "replace", "update", "ignore"]:
        raise ValueError("on_conflict must be one of: 'error', 'replace', 'update', 'ignore'")

    if _config.ROUTE_TO_BACKEND:
        return {"table": table, "data": data, "if_exists": if_exists}

    if not isinstance(session, Session):
        raise TypeError("Session must be a SQLAlchemy session")

    try:
        table_factory = TableFactory(table, session)
        logger.debug(f"{data=}")

        table_factory.write(data=data, if_exists=if_exists)

        session.commit()
        return None  # noqa: TRY300

    except Exception as e:
        session.rollback()
        logger.error(f"Error writing to table: {e}")
        raise