Skip to content

opensampl.vendors.base_probe

Abstract probe Base which provides scaffolding for vendor specific implementation

BaseProbe

Bases: ABC

BaseProbe abstract object

Source code in opensampl/vendors/base_probe.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
class BaseProbe(ABC):
    """BaseProbe abstract object"""

    vendor: ClassVar[VendorType]

    def __init__(
        self,
        input_file: str | Path | None = None,
        probe_key: ProbeKey | None = None,
        chunk_size: int | None = None,
        **kwargs: dict,
    ):
        """Initialize probe given input file"""
        self.input_file: Path | None = Path(input_file) if input_file else None
        self.probe_key: ProbeKey = probe_key
        self.chunk_size: int | None = chunk_size
        self.metadata: dict = {} | kwargs

        self.metadata_parsed: bool = False

    @classmethod
    def help_str(cls) -> str:
        """Help string for use in the CLI."""
        return (
            f"Processes a file or directory to load {cls.__name__} metadata and/or time series data.\n\n"
            "By default, both metadata and time series data are processed. "
            "If you specify either --metadata or --time-data, only the selected operation(s) will be performed."
        )

    @classmethod
    def get_cli_options(cls) -> list[Callable]:
        """Return the click options/arguments for the probe class."""
        return [
            click.option(
                "--metadata",
                "-m",
                is_flag=True,
                help="Load probe metadata from provided file",
            ),
            click.option(
                "--time-data",
                "-t",
                is_flag=True,
                help="Load time series data from provided file",
            ),
            click.option(
                "--archive-path",
                "-a",
                type=click.Path(exists=False, file_okay=False, dir_okay=True, path_type=Path),
                help="Override default archive directory path for processed files. Default: ./archive",
            ),
            click.option(
                "--no-archive",
                "-n",
                is_flag=True,
                help="Do not archive processed files when flag provided",
            ),
            click.option(
                "--max-workers",
                "-w",
                type=int,
                default=4,
                help="Maximum number of worker threads when processing directories",
            ),
            click.option(
                "--chunk-size",
                "-c",
                type=int,
                required=False,
                help="How many records to send at a time. If None, sends all at once. default: None",
            ),
            click.option(
                "--show-progress",
                "-p",
                is_flag=True,
                help="If flag provided, show the tqdm progress bar when processing directories. For best experience, "
                "set LOG_LEVEL=ERROR when using this option.",
            ),
            click.argument(
                "filepath",
                type=click.Path(exists=True, path_type=Path),
            ),
            click.pass_context,
        ]

    @classmethod
    def process_single_file(  # noqa: PLR0912, C901
        cls,
        filepath: Path,
        metadata: bool,
        time_data: bool,
        archive_dir: Path,
        no_archive: bool,
        chunk_size: int | None = None,
        pbar: tqdm | DummyTqdm | None = None,
        **kwargs: dict,
    ) -> None:
        """Process a single file with the given options."""
        try:
            probe = cls(input_file=filepath, chunk_size=chunk_size, **kwargs)
            try:
                if metadata:
                    logger.debug(f"Loading {cls.__name__} metadata from {filepath}")
                    probe.send_metadata()
                    logger.debug(f"Metadata loading complete for {filepath}")
            except requests.exceptions.HTTPError as e:
                resp = e.response
                if resp is None:
                    raise
                status_code = resp.status_code
                if status_code == 409:
                    logger.warning(
                        f"{filepath} violates unique constraint for metadata, implying already loaded.  "
                        f"Will move to archive if archiving is enabled"
                    )
                else:
                    raise

            try:
                if time_data:
                    logger.debug(f"Loading {cls.__name__} time series data from {filepath}")
                    probe.process_time_data()
                    logger.debug(f"Time series data loading complete for {filepath}")
            except requests.exceptions.HTTPError as e:
                resp = e.response
                if resp is None:
                    raise
                status_code = resp.status_code
                if status_code == 409:
                    logger.warning(
                        f"{filepath} violates unique constraint for time data, implying already loaded. "
                        f"Will move to archive if archiving is enabled."
                    )
                else:
                    raise
            except IntegrityError as e:
                if isinstance(e.orig, psycopg2.errors.UniqueViolation):  # ty: ignore[unresolved-attribute]
                    logger.warning(
                        f"{filepath} violates unique constraint for time data, implying already loaded. "
                        f"Will move to archive if archiving is enabled."
                    )

            if not no_archive:
                probe.archive_file(archive_dir)

            if pbar:
                pbar.update(1)

        except Exception as e:
            logger.error(f"Error processing file {filepath}: {e!s}", exc_info=True)
            raise

    def archive_file(self, archive_dir: Path):
        """
        Archive processed probe file

        Puts the file in the archive directory, with year/month/vendor/ipaddress_id hierarchy based on
        date that the file was processed.
        """
        now = datetime.now(tz=timezone.utc)
        archive_path = archive_dir / str(now.year) / f"{now.month:02d}" / self.vendor.name / str(self.probe_key)
        archive_path.mkdir(parents=True, exist_ok=True)
        shutil.move(str(self.input_file), str(archive_path / self.input_file.name))

    @classmethod
    def get_cli_command(cls) -> Callable:
        """
        Create a click command that handles both single files and directories.

        Returns
        -------
            A click CLI command that loads and processes probe data.

        """

        def make_command(f: Callable) -> Callable:
            for option in reversed(cls.get_cli_options()):
                f = option(f)
            return click.command(name=cls.vendor.name.lower(), help=cls.help_str())(f)

        def load_callback(ctx: click.Context, **kwargs: dict) -> None:
            """Load probe data from file or directory."""
            try:
                config = cls._extract_load_config(ctx, kwargs)
                cls._prepare_archive(config.archive_dir, config.no_archive)

                if config.filepath.is_file():
                    cls._process_file(config, kwargs)
                elif config.filepath.is_dir():
                    cls._process_directory(config, kwargs)

            except Exception as e:
                logger.error(f"Error: {e!s}")
                raise click.Abort(f"Error: {e!s}") from e

        return make_command(load_callback)

    @classmethod
    def _extract_load_config(cls, ctx: click.Context, kwargs: dict) -> LoadConfig:
        """
        Extract and normalize CLI keyword arguments into a LoadConfig object.

        Args:
        ----
            ctx: Click context object
            kwargs: Dictionary of keyword arguments passed to the CLI command

        Returns:
        -------
            A LoadConfig object with all relevant parameters

        """
        config = LoadConfig(
            filepath=kwargs.pop("filepath"),
            archive_dir=kwargs.pop("archive_path", None) or ctx.obj["conf"].ARCHIVE_PATH,
            metadata=kwargs.pop("metadata", False),
            time_data=kwargs.pop("time_data", False),
            no_archive=kwargs.pop("no_archive", False),
            max_workers=kwargs.pop("max_workers", 4),
            chunk_size=kwargs.pop("chunk_size", None),
            show_progress=kwargs.pop("show_progress", False),
        )

        if not config.metadata and not config.time_data:
            config.metadata = True
            config.time_data = True

        return config

    @classmethod
    def _prepare_archive(cls, archive_dir: Path, no_archive: bool) -> None:
        """
        Create the archive output directory if archiving is enabled.

        Args:
        ----
            archive_dir: Path to the archive output directory
            no_archive: If True, skip creating the archive directory

        """
        if not no_archive:
            archive_dir.mkdir(parents=True, exist_ok=True)

    @classmethod
    def _process_file(cls, config: LoadConfig, extra_kwargs: dict) -> None:
        """
        Process a single probe data file.

        Args:
        ----
            config: LoadConfig object containing file, archive, and processing flags
            extra_kwargs: Additional keyword arguments passed to the processing function

        """
        cls.process_single_file(
            config.filepath,
            config.metadata,
            config.time_data,
            config.archive_dir,
            config.no_archive,
            config.chunk_size,
            **extra_kwargs,
        )

    @classmethod
    def filter_files(cls, files: list[Path]) -> list[Path]:
        """Filter the files found in the input directory when loading this vendor's data files"""
        return files

    @classmethod
    def _process_directory(cls, config: LoadConfig, extra_kwargs: dict) -> None:
        """
        Process all files in a directory using a thread pool and optional progress bar.

        Args:
        ----
            config: LoadConfig object containing directory, archive, and processing flags
            extra_kwargs: Additional keyword arguments passed to the processing function

        Raises:
        ------
            Logs and continues on individual thread exceptions, but does not raise

        """
        files = [x for x in config.filepath.iterdir() if x.is_file()]
        files = cls.filter_files(files)
        logger.info(f"Found {len(files)} files in directory {config.filepath}")
        progress_context = tqdm if config.show_progress else dummy_tqdm

        with progress_context(total=len(files), desc=f"Processing {config.filepath.name}") as pbar:  # noqa: SIM117
            with ThreadPoolExecutor(max_workers=config.max_workers) as executor:
                futures = [
                    executor.submit(
                        cls.process_single_file,
                        file,
                        config.metadata,
                        config.time_data,
                        config.archive_dir,
                        config.no_archive,
                        config.chunk_size,
                        pbar=pbar,
                        **extra_kwargs,
                    )
                    for file in files
                ]

                for future in futures:
                    try:
                        future.result()
                    except Exception as e:  # noqa: PERF203
                        logger.error(f"Error in thread: {e!s}")

    @property
    def probe_id(self):
        """Return probe_id of probe"""
        return self.probe_key.probe_id

    @property
    def ip_address(self):
        """Return ip_address of probe"""
        return self.probe_key.ip_address

    @abstractmethod
    def process_time_data(self) -> None:
        """
        Parse and load time series data from self.input_file.

        Use either send_time_data (which prefills METRICS.PHASE_OFFSET)
        or send_data and provide alternative METRICS type.
        Both require a df as follows:
            pd.DataFrame with columns:
                - time (datetime64[ns]): timestamp for each measurement
                - value (float64): measured value at each timestamp
        """

    @dualmethod
    def send_data(
        self,
        data: pd.DataFrame,
        metric: MetricType,
        reference_type: ReferenceType,
        compound_reference: dict[str, Any] | None = None,
        probe_key: ProbeKey | None = None,
    ) -> None:
        """Ingests data into the database"""
        if isinstance(self, BaseProbe) and probe_key is None:
            probe_key = self.probe_key

        if probe_key is None:
            raise ValueError("send data must be called with probe_key if used as class method")

        if hasattr(self, "chunk_size") and self.chunk_size:
            for chunk_start in range(0, len(data), self.chunk_size):
                chunk = data.iloc[chunk_start : chunk_start + self.chunk_size]
                load_time_data(
                    probe_key=probe_key,
                    metric_type=metric,
                    reference_type=reference_type,
                    data=chunk,
                    compound_key=compound_reference,
                )
        else:
            load_time_data(
                probe_key=probe_key,
                metric_type=metric,
                reference_type=reference_type,
                data=data,
                compound_key=compound_reference,
            )

    def send_time_data(
        self, data: pd.DataFrame, reference_type: ReferenceType, compound_reference: dict[str, Any] | None = None
    ):
        """
        Ingests time data into the database

        :param chunk_size: How many records to send at a time. If None, sends all at once. default: None
        :return:
        """
        self.send_data(
            data=data, metric=METRICS.PHASE_OFFSET, reference_type=reference_type, compound_reference=compound_reference
        )

    @abstractmethod
    def process_metadata(self) -> dict:
        """
        Process metadata

        Returns
        -------
            Dict[str, Any] which is for some or all of the metadata fields for the specific vendor

        """

    @classmethod
    def _send_metadata_to_db(cls, probe_key: ProbeKey, metadata: dict) -> None:
        """Send metadata to the database."""
        load_probe_metadata(vendor=cls.vendor, probe_key=probe_key, data=metadata)
        logger.debug(f"Sent metadata for probe {probe_key}")

    def send_metadata(self):
        """Send metadata to database"""
        metadata = self.process_metadata()
        load_probe_metadata(vendor=self.vendor, probe_key=self.probe_key, data=metadata)

ip_address property

Return ip_address of probe

probe_id property

Return probe_id of probe

__init__(input_file=None, probe_key=None, chunk_size=None, **kwargs)

Initialize probe given input file

Source code in opensampl/vendors/base_probe.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def __init__(
    self,
    input_file: str | Path | None = None,
    probe_key: ProbeKey | None = None,
    chunk_size: int | None = None,
    **kwargs: dict,
):
    """Initialize probe given input file"""
    self.input_file: Path | None = Path(input_file) if input_file else None
    self.probe_key: ProbeKey = probe_key
    self.chunk_size: int | None = chunk_size
    self.metadata: dict = {} | kwargs

    self.metadata_parsed: bool = False

archive_file(archive_dir)

Archive processed probe file

Puts the file in the archive directory, with year/month/vendor/ipaddress_id hierarchy based on date that the file was processed.

Source code in opensampl/vendors/base_probe.py
293
294
295
296
297
298
299
300
301
302
303
def archive_file(self, archive_dir: Path):
    """
    Archive processed probe file

    Puts the file in the archive directory, with year/month/vendor/ipaddress_id hierarchy based on
    date that the file was processed.
    """
    now = datetime.now(tz=timezone.utc)
    archive_path = archive_dir / str(now.year) / f"{now.month:02d}" / self.vendor.name / str(self.probe_key)
    archive_path.mkdir(parents=True, exist_ok=True)
    shutil.move(str(self.input_file), str(archive_path / self.input_file.name))

filter_files(files) classmethod

Filter the files found in the input directory when loading this vendor's data files

Source code in opensampl/vendors/base_probe.py
405
406
407
408
@classmethod
def filter_files(cls, files: list[Path]) -> list[Path]:
    """Filter the files found in the input directory when loading this vendor's data files"""
    return files

get_cli_command() classmethod

Create a click command that handles both single files and directories.

Returns
A click CLI command that loads and processes probe data.
Source code in opensampl/vendors/base_probe.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
@classmethod
def get_cli_command(cls) -> Callable:
    """
    Create a click command that handles both single files and directories.

    Returns
    -------
        A click CLI command that loads and processes probe data.

    """

    def make_command(f: Callable) -> Callable:
        for option in reversed(cls.get_cli_options()):
            f = option(f)
        return click.command(name=cls.vendor.name.lower(), help=cls.help_str())(f)

    def load_callback(ctx: click.Context, **kwargs: dict) -> None:
        """Load probe data from file or directory."""
        try:
            config = cls._extract_load_config(ctx, kwargs)
            cls._prepare_archive(config.archive_dir, config.no_archive)

            if config.filepath.is_file():
                cls._process_file(config, kwargs)
            elif config.filepath.is_dir():
                cls._process_directory(config, kwargs)

        except Exception as e:
            logger.error(f"Error: {e!s}")
            raise click.Abort(f"Error: {e!s}") from e

    return make_command(load_callback)

get_cli_options() classmethod

Return the click options/arguments for the probe class.

Source code in opensampl/vendors/base_probe.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@classmethod
def get_cli_options(cls) -> list[Callable]:
    """Return the click options/arguments for the probe class."""
    return [
        click.option(
            "--metadata",
            "-m",
            is_flag=True,
            help="Load probe metadata from provided file",
        ),
        click.option(
            "--time-data",
            "-t",
            is_flag=True,
            help="Load time series data from provided file",
        ),
        click.option(
            "--archive-path",
            "-a",
            type=click.Path(exists=False, file_okay=False, dir_okay=True, path_type=Path),
            help="Override default archive directory path for processed files. Default: ./archive",
        ),
        click.option(
            "--no-archive",
            "-n",
            is_flag=True,
            help="Do not archive processed files when flag provided",
        ),
        click.option(
            "--max-workers",
            "-w",
            type=int,
            default=4,
            help="Maximum number of worker threads when processing directories",
        ),
        click.option(
            "--chunk-size",
            "-c",
            type=int,
            required=False,
            help="How many records to send at a time. If None, sends all at once. default: None",
        ),
        click.option(
            "--show-progress",
            "-p",
            is_flag=True,
            help="If flag provided, show the tqdm progress bar when processing directories. For best experience, "
            "set LOG_LEVEL=ERROR when using this option.",
        ),
        click.argument(
            "filepath",
            type=click.Path(exists=True, path_type=Path),
        ),
        click.pass_context,
    ]

help_str() classmethod

Help string for use in the CLI.

Source code in opensampl/vendors/base_probe.py
161
162
163
164
165
166
167
168
@classmethod
def help_str(cls) -> str:
    """Help string for use in the CLI."""
    return (
        f"Processes a file or directory to load {cls.__name__} metadata and/or time series data.\n\n"
        "By default, both metadata and time series data are processed. "
        "If you specify either --metadata or --time-data, only the selected operation(s) will be performed."
    )

process_metadata() abstractmethod

Process metadata

Returns
Dict[str, Any] which is for some or all of the metadata fields for the specific vendor
Source code in opensampl/vendors/base_probe.py
524
525
526
527
528
529
530
531
532
533
@abstractmethod
def process_metadata(self) -> dict:
    """
    Process metadata

    Returns
    -------
        Dict[str, Any] which is for some or all of the metadata fields for the specific vendor

    """

process_single_file(filepath, metadata, time_data, archive_dir, no_archive, chunk_size=None, pbar=None, **kwargs) classmethod

Process a single file with the given options.

Source code in opensampl/vendors/base_probe.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@classmethod
def process_single_file(  # noqa: PLR0912, C901
    cls,
    filepath: Path,
    metadata: bool,
    time_data: bool,
    archive_dir: Path,
    no_archive: bool,
    chunk_size: int | None = None,
    pbar: tqdm | DummyTqdm | None = None,
    **kwargs: dict,
) -> None:
    """Process a single file with the given options."""
    try:
        probe = cls(input_file=filepath, chunk_size=chunk_size, **kwargs)
        try:
            if metadata:
                logger.debug(f"Loading {cls.__name__} metadata from {filepath}")
                probe.send_metadata()
                logger.debug(f"Metadata loading complete for {filepath}")
        except requests.exceptions.HTTPError as e:
            resp = e.response
            if resp is None:
                raise
            status_code = resp.status_code
            if status_code == 409:
                logger.warning(
                    f"{filepath} violates unique constraint for metadata, implying already loaded.  "
                    f"Will move to archive if archiving is enabled"
                )
            else:
                raise

        try:
            if time_data:
                logger.debug(f"Loading {cls.__name__} time series data from {filepath}")
                probe.process_time_data()
                logger.debug(f"Time series data loading complete for {filepath}")
        except requests.exceptions.HTTPError as e:
            resp = e.response
            if resp is None:
                raise
            status_code = resp.status_code
            if status_code == 409:
                logger.warning(
                    f"{filepath} violates unique constraint for time data, implying already loaded. "
                    f"Will move to archive if archiving is enabled."
                )
            else:
                raise
        except IntegrityError as e:
            if isinstance(e.orig, psycopg2.errors.UniqueViolation):  # ty: ignore[unresolved-attribute]
                logger.warning(
                    f"{filepath} violates unique constraint for time data, implying already loaded. "
                    f"Will move to archive if archiving is enabled."
                )

        if not no_archive:
            probe.archive_file(archive_dir)

        if pbar:
            pbar.update(1)

    except Exception as e:
        logger.error(f"Error processing file {filepath}: {e!s}", exc_info=True)
        raise

process_time_data() abstractmethod

Parse and load time series data from self.input_file.

Use either send_time_data (which prefills METRICS.PHASE_OFFSET) or send_data and provide alternative METRICS type. Both require a df as follows: pd.DataFrame with columns: - time (datetime64[ns]): timestamp for each measurement - value (float64): measured value at each timestamp

Source code in opensampl/vendors/base_probe.py
463
464
465
466
467
468
469
470
471
472
473
474
@abstractmethod
def process_time_data(self) -> None:
    """
    Parse and load time series data from self.input_file.

    Use either send_time_data (which prefills METRICS.PHASE_OFFSET)
    or send_data and provide alternative METRICS type.
    Both require a df as follows:
        pd.DataFrame with columns:
            - time (datetime64[ns]): timestamp for each measurement
            - value (float64): measured value at each timestamp
    """

send_data(data, metric, reference_type, compound_reference=None, probe_key=None)

Ingests data into the database

Source code in opensampl/vendors/base_probe.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
@dualmethod
def send_data(
    self,
    data: pd.DataFrame,
    metric: MetricType,
    reference_type: ReferenceType,
    compound_reference: dict[str, Any] | None = None,
    probe_key: ProbeKey | None = None,
) -> None:
    """Ingests data into the database"""
    if isinstance(self, BaseProbe) and probe_key is None:
        probe_key = self.probe_key

    if probe_key is None:
        raise ValueError("send data must be called with probe_key if used as class method")

    if hasattr(self, "chunk_size") and self.chunk_size:
        for chunk_start in range(0, len(data), self.chunk_size):
            chunk = data.iloc[chunk_start : chunk_start + self.chunk_size]
            load_time_data(
                probe_key=probe_key,
                metric_type=metric,
                reference_type=reference_type,
                data=chunk,
                compound_key=compound_reference,
            )
    else:
        load_time_data(
            probe_key=probe_key,
            metric_type=metric,
            reference_type=reference_type,
            data=data,
            compound_key=compound_reference,
        )

send_metadata()

Send metadata to database

Source code in opensampl/vendors/base_probe.py
541
542
543
544
def send_metadata(self):
    """Send metadata to database"""
    metadata = self.process_metadata()
    load_probe_metadata(vendor=self.vendor, probe_key=self.probe_key, data=metadata)

send_time_data(data, reference_type, compound_reference=None)

Ingests time data into the database

:param chunk_size: How many records to send at a time. If None, sends all at once. default: None :return:

Source code in opensampl/vendors/base_probe.py
511
512
513
514
515
516
517
518
519
520
521
522
def send_time_data(
    self, data: pd.DataFrame, reference_type: ReferenceType, compound_reference: dict[str, Any] | None = None
):
    """
    Ingests time data into the database

    :param chunk_size: How many records to send at a time. If None, sends all at once. default: None
    :return:
    """
    self.send_data(
        data=data, metric=METRICS.PHASE_OFFSET, reference_type=reference_type, compound_reference=compound_reference
    )

DummyTqdm

Dummy tqdm object which does not print to terminal

Source code in opensampl/vendors/base_probe.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class DummyTqdm:
    """Dummy tqdm object which does not print to terminal"""

    def __init__(self, *args: list, **kwargs: dict):
        """Initialize dummy tqdm object"""
        self.args = args
        self.kwargs = kwargs

    def update(self, n: int = 1) -> None:
        """Fake an update call to tqdm."""
        pass

    def close(self) -> None:
        """Close an instance of tqdm."""
        pass

__init__(*args, **kwargs)

Initialize dummy tqdm object

Source code in opensampl/vendors/base_probe.py
108
109
110
111
def __init__(self, *args: list, **kwargs: dict):
    """Initialize dummy tqdm object"""
    self.args = args
    self.kwargs = kwargs

close()

Close an instance of tqdm.

Source code in opensampl/vendors/base_probe.py
117
118
119
def close(self) -> None:
    """Close an instance of tqdm."""
    pass

update(n=1)

Fake an update call to tqdm.

Source code in opensampl/vendors/base_probe.py
113
114
115
def update(self, n: int = 1) -> None:
    """Fake an update call to tqdm."""
    pass

LoadConfig

Bases: BaseModel

Model for storing probe loading configurations as provided by CLI

Source code in opensampl/vendors/base_probe.py
128
129
130
131
132
133
134
135
136
137
138
class LoadConfig(BaseModel):
    """Model for storing probe loading configurations as provided by CLI"""

    filepath: Path
    archive_dir: Path
    no_archive: bool = False
    metadata: bool = False
    time_data: bool = False
    max_workers: int = 4
    chunk_size: int | None = None
    show_progress: bool = False

dualmethod

Allows a method to be called both as an instance method and as a class method with the same function definition.

When called on an instance, the first argument will be the instance. When called on the class, the first argument will be the class itself.

Example
class MyClass:
    @dualmethod
    def greet(self_or_cls, name: str = "World"):
        if isinstance(self_or_cls, MyClass):
            print(f"Instance says: Hello {name}")
        else:
            print(f"Class says: Hello {name}")


# Can be called on class
MyClass.greet("Alice")  # Class says: Hello Alice

# Can be called on instance
obj = MyClass()
obj.greet("Bob")  # Instance says: Hello Bob
Source code in opensampl/vendors/base_probe.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class dualmethod:  # noqa: N801
    """
    Allows a method to be called both as an instance method and as a class method with the same function definition.

    When called on an instance, the first argument will be the instance.
    When called on the class, the first argument will be the class itself.

    Example:
        ```python
        class MyClass:
            @dualmethod
            def greet(self_or_cls, name: str = "World"):
                if isinstance(self_or_cls, MyClass):
                    print(f"Instance says: Hello {name}")
                else:
                    print(f"Class says: Hello {name}")


        # Can be called on class
        MyClass.greet("Alice")  # Class says: Hello Alice

        # Can be called on instance
        obj = MyClass()
        obj.greet("Bob")  # Instance says: Hello Bob
        ```

    """

    def __init__(self, func: F) -> None:
        """
        Initialize the dualmethod descriptor.

        Args:
            func: The function to be wrapped as a dual method

        """
        self.func: F = func
        self.__doc__: str | None = func.__doc__
        fname = getattr(func, "__name__", None)
        self.__name__: str = fname
        self.__qualname__: str = getattr(func, "__qualname__", fname)

    def __get__(self, obj: T | None, cls: type[T]) -> Callable[..., Any]:
        """
        Descriptor protocol method that returns the appropriate bound method.

        Args:
            obj: The instance from which the method is being accessed (None for class access)
            cls: The class that owns the method

        Returns:
            A wrapper function that calls the original function with either the
            instance or class as the first argument

        """

        def wrapper(*args: Any, **kwargs: Any) -> Any:
            return self.func(obj if obj is not None else cls, *args, **kwargs)

        # Preserve function metadata
        wrapper.__name__ = self.__name__
        wrapper.__doc__ = self.__doc__
        wrapper.__qualname__ = self.__qualname__

        return wrapper

__get__(obj, cls)

Descriptor protocol method that returns the appropriate bound method.

Parameters:

Name Type Description Default
obj T | None

The instance from which the method is being accessed (None for class access)

required
cls type[T]

The class that owns the method

required

Returns:

Type Description
Callable[..., Any]

A wrapper function that calls the original function with either the

Callable[..., Any]

instance or class as the first argument

Source code in opensampl/vendors/base_probe.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __get__(self, obj: T | None, cls: type[T]) -> Callable[..., Any]:
    """
    Descriptor protocol method that returns the appropriate bound method.

    Args:
        obj: The instance from which the method is being accessed (None for class access)
        cls: The class that owns the method

    Returns:
        A wrapper function that calls the original function with either the
        instance or class as the first argument

    """

    def wrapper(*args: Any, **kwargs: Any) -> Any:
        return self.func(obj if obj is not None else cls, *args, **kwargs)

    # Preserve function metadata
    wrapper.__name__ = self.__name__
    wrapper.__doc__ = self.__doc__
    wrapper.__qualname__ = self.__qualname__

    return wrapper

__init__(func)

Initialize the dualmethod descriptor.

Parameters:

Name Type Description Default
func F

The function to be wrapped as a dual method

required
Source code in opensampl/vendors/base_probe.py
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(self, func: F) -> None:
    """
    Initialize the dualmethod descriptor.

    Args:
        func: The function to be wrapped as a dual method

    """
    self.func: F = func
    self.__doc__: str | None = func.__doc__
    fname = getattr(func, "__name__", None)
    self.__name__: str = fname
    self.__qualname__: str = getattr(func, "__qualname__", fname)

dummy_tqdm(*args, **kwargs)

Create a dummy tqdm object which will not print to terminal

Source code in opensampl/vendors/base_probe.py
122
123
124
125
@contextmanager
def dummy_tqdm(*args: list, **kwargs: dict) -> Generator:
    """Create a dummy tqdm object which will not print to terminal"""
    yield DummyTqdm(*args, **kwargs)