Skip to content

opensampl.server.backend.main

API Configuration to Indirectly interact with the database

ProbeMetadataPayload

Bases: BaseModel

Probe Metadata Payload Model

Source code in opensampl/server/backend/main.py
47
48
49
50
51
52
class ProbeMetadataPayload(BaseModel):
    """Probe Metadata Payload Model"""

    vendor: VendorType
    probe_key: ProbeKey
    data: dict[str, Any]

TimeDataPoint

Bases: BaseModel

Time Data Model

Source code in opensampl/server/backend/main.py
32
33
34
35
36
class TimeDataPoint(BaseModel):
    """Time Data Model"""

    time: str
    value: float

WriteTablePayload

Bases: BaseModel

Write Table Payload Model

Source code in opensampl/server/backend/main.py
39
40
41
42
43
44
class WriteTablePayload(BaseModel):
    """Write Table Payload Model"""

    table: str
    data: dict[str, Any]
    if_exists: load_data.conflict_actions = "update"

check_log_level(api_key=Depends(require_api_key()))

Check which log levels are visible in backend container

Source code in opensampl/server/backend/main.py
166
167
168
169
170
171
172
173
174
@app.get("/checkloglevel")
def check_log_level(api_key: str = Depends(require_api_key())):
    """Check which log levels are visible in backend container"""
    logger.debug("Debug test")
    logger.info("Info test")
    logger.warning("Warning test")
    logger.error("Error test")
    current_level = next(iter(logger._core.handlers.values()))["level"].name  # noqa: SLF001
    return {"loglevel": current_level}

create_new_tables(create_schema=True, api_key=Depends(require_api_key()), session=Depends(get_db))

Update DB based on ORM Tables

Source code in opensampl/server/backend/main.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@app.get("/create_new_tables")
def create_new_tables(
    create_schema: bool = True, api_key: str = Depends(require_api_key()), session: Session = Depends(get_db)
):
    """Update DB based on ORM Tables"""
    try:
        load_data.create_new_tables(create_schema=create_schema, session=session)
        return JSONResponse(content={"message": "Succeeded in creating any new tables"}, status_code=200)
    except SQLAlchemyError as e:
        logger.error(f"SQLAlchemy error: {e}")
        return JSONResponse(content={"message": f"Database error: {e}"}, status_code=500)
    except json.JSONDecodeError as e:
        logger.error(f"JSON decode error: {e}")
        return JSONResponse(content={"message": f"Invalid JSON data: {e}"}, status_code=400)
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return JSONResponse(content={"message": f"Failed to load JSON into database: {e}"}, status_code=500)

docs_redirect() async

Redirect bare url to docs

Source code in opensampl/server/backend/main.py
152
153
154
155
@app.get("/", include_in_schema=False)
async def docs_redirect():
    """Redirect bare url to docs"""
    return RedirectResponse(url="/docs")

generate_api_key(expire_after=None, api_key=Depends(require_api_key(bootstrap=True)), session=Depends(get_db))

Generate new API key in the database

Source code in opensampl/server/backend/main.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
@app.get("/gen_api_key")
def generate_api_key(
    expire_after: int | None = None,
    api_key: str | None = Depends(require_api_key(bootstrap=True)),
    session: Session = Depends(get_db),
):
    """Generate new API key in the database"""
    try:
        new_key = APIAccessKey()
        new_key.generate_key()
        if expire_after:
            new_key.expires_at = datetime.now(tz=UTC) + timedelta(days=expire_after)

        session.add(new_key)

        session.commit()
        return JSONResponse(content={"message": "Succeeded in creating new access key"}, status_code=200)
    except SQLAlchemyError as e:
        logger.error(f"SQLAlchemy error: {e}")
        return JSONResponse(content={"message": f"Database error: {e}"}, status_code=500)
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return JSONResponse(content={"message": f"Failed to create new access key: {e}"}, status_code=500)

get_db()

Get database session

Source code in opensampl/server/backend/main.py
125
126
127
128
129
130
131
132
def get_db():
    """Get database session"""
    Session = sessionmaker(bind=engine)  # noqa: N806
    try:
        session = Session()
        yield session
    finally:
        session.close()

get_keys()

Get active API keys

Source code in opensampl/server/backend/main.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def get_keys():
    """Get active API keys"""
    env_keys = os.getenv("API_KEYS", "").strip()
    keys = [k.strip() for k in env_keys.split(",") if k.strip()]
    if keys:
        logger.debug("api access keys loaded from env")
        return keys
    try:
        Session = sessionmaker(bind=engine)  # noqa: N806
        with Session() as session:
            now = datetime.now(tz=UTC)
            stmt = select(APIAccessKey.key).where(or_(APIAccessKey.expires_at is None, APIAccessKey.expires_at > now))
            result = session.execute(stmt)
            keys = [row[0] for row in result.all()]
            logger.debug("api access keys loaded from db")
            return keys
    except Exception as e:
        logger.debug(f"exception attempting to load api access keys from db: {e}")
        return []

healthcheck()

Ensure the api is accepting queries

Source code in opensampl/server/backend/main.py
345
346
347
348
@app.get("/healthcheck")
def healthcheck():
    """Ensure the api is accepting queries"""
    return {"status": "OK"}

healthcheck_db()

Ensure the db is accepting connections

Source code in opensampl/server/backend/main.py
351
352
353
354
355
356
357
358
359
360
@app.get("/healthcheck_database")
def healthcheck_db():
    """Ensure the db is accepting connections"""
    try:
        with engine.connect() as connection:
            connection.execute(text("SELECT 1"))
    except SQLAlchemyError as e:
        return JSONResponse(content={"message": f"Database connection error: {e!s}"}, status_code=503)
    else:
        return {"status": "OK"}

healthcheck_metadata()

Ensure that the database exists AND the expected format is present

Source code in opensampl/server/backend/main.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@app.get("/healthcheck_metadata")
def healthcheck_metadata():
    """Ensure that the database exists AND the expected format is present"""
    # eventually, we want to make the schema configurable through environment variables
    # for now, we have it hard coded too many places. So this is a small step towards that goal
    SCHEMA = "castdb"  # noqa: N806

    try:
        with engine.connect() as connection:
            result = connection.execute(
                text("SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema;"),
                {"schema": SCHEMA},
            )
            schema_exists = result.fetchone() is not None
        if schema_exists:
            return {"status": "OK"}
        return JSONResponse(status_code=500, content={"message": f"Expected schema '{SCHEMA}' does not exist"})
    except SQLAlchemyError as e:
        return JSONResponse(content={"message": f"Database connection error: {e!s}"}, status_code=503)

load_probe_metadata(payload, api_key=Depends(require_api_key()), session=Depends(get_db))

Load metadata for given probe

Source code in opensampl/server/backend/main.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
@app.post("/load_probe_metadata")
def load_probe_metadata(
    payload: ProbeMetadataPayload, api_key: str = Depends(require_api_key()), session: Session = Depends(get_db)
):
    """Load metadata for given probe"""
    logger.debug(f"Received payload: {payload.model_dump()}")

    try:
        load_data.load_probe_metadata(
            vendor=payload.vendor, probe_key=payload.probe_key, data=payload.data, session=session
        )
        logger.debug(
            f"Successfully wrote to {ProbeMetadata.__tablename__} and {payload.vendor.metadata_table}: {payload.data}"
        )
        return JSONResponse(content={"message": f"Succeeded loaded metadata for {payload.probe_key}"}, status_code=200)
    except IntegrityError as e:
        session.rollback()
        if isinstance(e.orig, psycopg2.errors.UniqueViolation):
            return JSONResponse(content={"message": f"Unique violation error: {e}"}, status_code=409)
        return JSONResponse(content={"message": f"Integrity error: {e}"}, status_code=500)
    except SQLAlchemyError as e:
        logger.error(f"SQLAlchemy error: {e}")
        return JSONResponse(content={"message": f"Database error: {e}"}, status_code=500)
    except json.JSONDecodeError as e:
        logger.error(f"JSON decode error: {e}")
        return JSONResponse(content={"message": f"Invalid JSON data: {e}"}, status_code=400)
    except Exception as e:
        logger.exception(f"Unexpected error: {e}")
        return JSONResponse(content={"message": f"Failed to load JSON into database: {e}"}, status_code=500)

load_time_data(probe_key_str=Form(...), metric_type_str=Form(None), reference_type_str=Form(None), compound_key_str=Form(None), file=File(...), api_key=Depends(require_api_key()), session=Depends(get_db)) async

Load provided data for given probe

Source code in opensampl/server/backend/main.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@app.post("/load_time_data")
async def load_time_data(  # noqa: PLR0912, C901
    probe_key_str: str = Form(...),
    metric_type_str: str | None = Form(None),
    reference_type_str: str | None = Form(None),
    compound_key_str: str | None = Form(None),
    file: UploadFile = File(...),
    api_key: str = Depends(require_api_key()),
    session: Session = Depends(get_db),
):
    """Load provided data for given probe"""
    try:
        probe_key = ProbeKey(**json.loads(probe_key_str))

        if metric_type_str is not None:
            metric_type_dict = json.loads(metric_type_str)
            metric_type = MetricType(**metric_type_dict)
        else:
            metric_type = METRICS.UNKNOWN

        if reference_type_str is not None:
            reference_type_dict = json.loads(reference_type_str)
            if "reference_table" in reference_type_dict:
                reference_type = CompoundReferenceType(**reference_type_dict)
            else:
                reference_type = ReferenceType(**reference_type_dict)
        else:
            reference_type = REF_TYPES.UNKNOWN

        compound_key = None if compound_key_str is None else json.loads(compound_key_str)

        content = await file.read()
        df = pd.read_csv(io.BytesIO(content))
        logger.info(df.head())
        # Convert time strings back to datetime
        df["time"] = pd.to_datetime(df["time"])

        # Use the same load_time_data function as before
        load_data.load_time_data(
            probe_key=probe_key,
            metric_type=metric_type,
            reference_type=reference_type,
            compound_key=compound_key,
            data=df,
            session=session,
        )

        return JSONResponse(content={"message": f"Successfully loaded {len(df)} data points"}, status_code=200)
    except IntegrityError as e:
        if session:
            session.rollback()
            session.close()
        if isinstance(e.orig, psycopg2.errors.UniqueViolation):
            return JSONResponse(content={"message": f"Unique violation error: {e}"}, status_code=409)
        return JSONResponse(content={"message": f"Integrity error: {e}"}, status_code=500)
    except SQLAlchemyError as e:
        logger.error(f"Database error: {e}")
        if session:
            session.rollback()
            session.close()
        raise HTTPException(status_code=500, detail=f"Database error: {e!s}") from e
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        if session:
            session.rollback()
            session.close()
        raise HTTPException(status_code=500, detail=f"Error processing time series data: {e!s}") from e

metrics()

Expose Prometheus metrics.

Source code in opensampl/server/backend/main.py
384
385
386
387
@app.get("/metrics", include_in_schema=False)
def metrics():
    """Expose Prometheus metrics."""
    return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)

metrics_middleware(request, call_next) async

Middleware to track request metrics.

Source code in opensampl/server/backend/main.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@app.middleware("http")
async def metrics_middleware(request: Request, call_next: Callable) -> Response:
    """Middleware to track request metrics."""
    if request.url.path in EXCLUDED_PATHS:
        return await call_next(request)
    start_time = time.time()
    response: Response = await call_next(request)
    duration = time.time() - start_time

    REQUEST_COUNT.labels(method=request.method, endpoint=request.url.path, http_status=response.status_code).inc()

    REQUEST_LATENCY.labels(method=request.method, endpoint=request.url.path).observe(duration)

    return response

require_api_key(bootstrap=False)

Return function to validate api key with or without bootstrap.

Source code in opensampl/server/backend/main.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def require_api_key(bootstrap: bool = False):
    """Return function to validate api key with or without bootstrap."""

    def validate_api_key(api_key: str = Security(api_key_header)) -> str | None:
        """Validate provided API key"""
        if not USE_API_KEY:
            return None  # Security is disabled

        keys = get_keys()
        if not keys and bootstrap:
            logger.warning("No API keys configured; allowing bootstrap API key generation")
            return None
        if api_key not in keys:
            raise HTTPException(status_code=403, detail="Invalid or missing API key")
        return api_key

    return validate_api_key

set_log_level(newloglevel, api_key=Depends(require_api_key()))

Change visible log level in backend container

Source code in opensampl/server/backend/main.py
158
159
160
161
162
163
@app.get("/setloglevel")
def set_log_level(newloglevel: str, api_key: str = Depends(require_api_key())):
    """Change visible log level in backend container"""
    newloglevel = newloglevel.upper()
    logger.configure(handlers=[{"sink": sys.stderr, "level": newloglevel}])
    return {"loglevel": newloglevel}

write_to_table(payload, api_key=Depends(require_api_key()), session=Depends(get_db))

Write given data to specified table

Source code in opensampl/server/backend/main.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@app.post("/write_to_table")
def write_to_table(
    payload: WriteTablePayload, api_key: str = Depends(require_api_key()), session: Session = Depends(get_db)
):
    """Write given data to specified table"""
    try:
        load_data.write_to_table(table=payload.table, data=payload.data, if_exists=payload.if_exists, session=session)
        logger.debug(f"Successfully wrote to {payload.table} using: {payload.data}")
        return JSONResponse(content={"message": f"Succeeded loading data into {payload.table}"}, status_code=200)
    except IntegrityError as e:
        if isinstance(e.orig, psycopg2.errors.UniqueViolation):
            return JSONResponse(content={"message": f"Unique violation error: {e}"}, status_code=409)
        return JSONResponse(content={"message": f"Integrity error: {e}"}, status_code=500)
    except SQLAlchemyError as e:
        logger.error(f"SQLAlchemy error: {e}")
        return JSONResponse(content={"message": f"Database error: {e}"}, status_code=500)
    except json.JSONDecodeError as e:
        logger.error(f"JSON decode error: {e}")
        return JSONResponse(content={"message": f"Invalid JSON data: {e}"}, status_code=400)
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return JSONResponse(content={"message": f"Failed to load JSON into database: {e}"}, status_code=500)