Skip to content

opensampl.mixins.collect

Tools for adding data collection functionality to probes

CollectMixin

Bases: ABC

Mixin to add data collection capabilities to a probe class

Source code in opensampl/mixins/collect.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class CollectMixin(ABC):
    """Mixin to add data collection capabilities to a probe class"""

    class DataArtifact(BaseModel):
        """Model for a single metric type of collected data"""

        value: pd.DataFrame
        metric: MetricType = METRICS.UNKNOWN
        reference_type: ReferenceType = REF_TYPES.UNKNOWN
        compound_reference: dict[str, Any] | None = None
        model_config = ConfigDict(arbitrary_types_allowed=True)

    class CollectArtifact(BaseModel):
        """Model for a single probe's collected data"""

        data: list["CollectMixin.DataArtifact"]
        probe_key: ProbeKey | None = None
        metadata: dict | None = Field(default_factory=dict)
        model_config = ConfigDict(arbitrary_types_allowed=True)

        @property
        def single_reference(self):
            """All individual data artifacts use the same reference"""
            if len(self.data) <= 1:
                return True
            return len({json.dumps(x.compound_reference, sort_keys=True) for x in self.data or []}) == 1

        @property
        def single_reference_type(self) -> bool:
            """All individual data artifacts use the same reference type"""
            if len(self.data) <= 1:
                return True
            return len({x.reference_type.name for x in self.data or []}) == 1

    class CollectConfig(BaseModel):
        """
        Configuration for collecting data

        Attributes:
            output_dir: When provided, will save collected data as a file to provided directory.
                Filename will be automatically generated as {vendor}_{ip_address}_{probe_id}_{vendor}_{timestamp}.txt
            load: Whether to load collected data directly to the database
            duration: Number of seconds to collect data for

        """

        output_dir: Path | None = None
        load: bool = False
        duration: int = 300

        ip_address: str = "127.0.0.1"
        probe_id: str = "1-1"

    @classmethod
    def collect_help_str(cls) -> str:
        """Help string for use in the collect CLI."""
        return (
            f"Collect data readings for {cls.__name__}\n\n"
            "Can collect data to a directory (using --output-dir), straight into the database (--load), or both"
        )

    @classmethod
    def get_collect_cli_options(cls) -> list[Callable]:
        """Return the click options/arguments for collecting probe data."""
        return [
            from_pydantic(cls.CollectConfig),
            click.pass_context,
        ]

    @classmethod
    def get_collect_cli_command(cls) -> Callable:
        """
        Create a click command that handles data collection

        Returns
        -------
            A click CLI command that collects probe data

        """

        def make_command(f: Callable) -> Callable:
            for option in reversed(cls.get_collect_cli_options()):
                f = option(f)
            return click.command(name=cls.vendor.name.lower(), help=cls.collect_help_str())(f)

        def collect_callback(
            ctx: click.Context,  # noqa: ARG001
            collect_config: CollectMixin.CollectConfig,
        ) -> None:
            """Load probe data from file or directory."""
            try:
                cls._collect_and_save(collect_config)

            except Exception as e:
                logger.exception(f"Error: {e!s}")
                raise click.Abort(f"Error: {e!s}") from e

        return make_command(collect_callback)

    @classmethod
    def _collect_and_save(cls, collect_config: CollectConfig) -> None:
        data: CollectMixin.CollectArtifact = cls.collect(collect_config)
        if data.probe_key is None:
            data.probe_key = ProbeKey(ip_address=collect_config.ip_address, probe_id=collect_config.probe_id)
        if collect_config.load:
            cls.load_metadata(probe_key=data.probe_key, metadata=data.metadata)

            for art in data.data:
                cls.send_data(
                    data=art.value,
                    metric=art.metric,
                    reference_type=art.reference_type,
                    compound_reference=art.compound_reference,
                    probe_key=data.probe_key,
                )
        if collect_config.output_dir:
            file_content = cls.create_file_content(data)
            collect_config.output_dir.mkdir(parents=True, exist_ok=True)
            now_stamp = datetime.now(tz=timezone.utc).timestamp()
            output = collect_config.output_dir / f"{cls.vendor.parser_class}_{data.probe_key!r}_{now_stamp}.txt"
            output.write_text(file_content)

    @classmethod
    def filter_files(cls, files: list[Path]) -> list[Path]:
        """Filter the files found in the input directory when loading this vendor's data files"""
        return [f for f in files if f.name.startswith(f"{cls.vendor.parser_class}_") and f.suffix == ".txt"]

    @classmethod
    def load_metadata(cls, probe_key: ProbeKey, metadata: dict) -> None:
        """
        Load provided metadata associated with given probe_key

        Distinct from BaseProbe.parse_metadata because it is a class method without access to self.input_file
        """
        load_probe_metadata(vendor=cls.vendor, probe_key=probe_key, data=metadata)

    @classmethod
    @abstractmethod
    def collect(cls, collect_config: CollectConfig) -> CollectArtifact:
        """Collect data and output CollectArtifact using collect_config"""
        pass

    @classmethod
    @abstractmethod
    def create_file_content(cls, collect_artifact: CollectArtifact) -> str:
        """Given a CollectArtifact, create the str content for a file"""
        pass

CollectArtifact

Bases: BaseModel

Model for a single probe's collected data

Source code in opensampl/mixins/collect.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class CollectArtifact(BaseModel):
    """Model for a single probe's collected data"""

    data: list["CollectMixin.DataArtifact"]
    probe_key: ProbeKey | None = None
    metadata: dict | None = Field(default_factory=dict)
    model_config = ConfigDict(arbitrary_types_allowed=True)

    @property
    def single_reference(self):
        """All individual data artifacts use the same reference"""
        if len(self.data) <= 1:
            return True
        return len({json.dumps(x.compound_reference, sort_keys=True) for x in self.data or []}) == 1

    @property
    def single_reference_type(self) -> bool:
        """All individual data artifacts use the same reference type"""
        if len(self.data) <= 1:
            return True
        return len({x.reference_type.name for x in self.data or []}) == 1

single_reference property

All individual data artifacts use the same reference

single_reference_type property

All individual data artifacts use the same reference type

CollectConfig

Bases: BaseModel

Configuration for collecting data

Attributes:

Name Type Description
output_dir Path | None

When provided, will save collected data as a file to provided directory. Filename will be automatically generated as {vendor}{ip_address}{probe_id}{vendor}{timestamp}.txt

load bool

Whether to load collected data directly to the database

duration int

Number of seconds to collect data for

Source code in opensampl/mixins/collect.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class CollectConfig(BaseModel):
    """
    Configuration for collecting data

    Attributes:
        output_dir: When provided, will save collected data as a file to provided directory.
            Filename will be automatically generated as {vendor}_{ip_address}_{probe_id}_{vendor}_{timestamp}.txt
        load: Whether to load collected data directly to the database
        duration: Number of seconds to collect data for

    """

    output_dir: Path | None = None
    load: bool = False
    duration: int = 300

    ip_address: str = "127.0.0.1"
    probe_id: str = "1-1"

DataArtifact

Bases: BaseModel

Model for a single metric type of collected data

Source code in opensampl/mixins/collect.py
25
26
27
28
29
30
31
32
class DataArtifact(BaseModel):
    """Model for a single metric type of collected data"""

    value: pd.DataFrame
    metric: MetricType = METRICS.UNKNOWN
    reference_type: ReferenceType = REF_TYPES.UNKNOWN
    compound_reference: dict[str, Any] | None = None
    model_config = ConfigDict(arbitrary_types_allowed=True)

collect(collect_config) abstractmethod classmethod

Collect data and output CollectArtifact using collect_config

Source code in opensampl/mixins/collect.py
158
159
160
161
162
@classmethod
@abstractmethod
def collect(cls, collect_config: CollectConfig) -> CollectArtifact:
    """Collect data and output CollectArtifact using collect_config"""
    pass

collect_help_str() classmethod

Help string for use in the collect CLI.

Source code in opensampl/mixins/collect.py
75
76
77
78
79
80
81
@classmethod
def collect_help_str(cls) -> str:
    """Help string for use in the collect CLI."""
    return (
        f"Collect data readings for {cls.__name__}\n\n"
        "Can collect data to a directory (using --output-dir), straight into the database (--load), or both"
    )

create_file_content(collect_artifact) abstractmethod classmethod

Given a CollectArtifact, create the str content for a file

Source code in opensampl/mixins/collect.py
164
165
166
167
168
@classmethod
@abstractmethod
def create_file_content(cls, collect_artifact: CollectArtifact) -> str:
    """Given a CollectArtifact, create the str content for a file"""
    pass

filter_files(files) classmethod

Filter the files found in the input directory when loading this vendor's data files

Source code in opensampl/mixins/collect.py
144
145
146
147
@classmethod
def filter_files(cls, files: list[Path]) -> list[Path]:
    """Filter the files found in the input directory when loading this vendor's data files"""
    return [f for f in files if f.name.startswith(f"{cls.vendor.parser_class}_") and f.suffix == ".txt"]

get_collect_cli_command() classmethod

Create a click command that handles data collection

Returns
A click CLI command that collects probe data
Source code in opensampl/mixins/collect.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@classmethod
def get_collect_cli_command(cls) -> Callable:
    """
    Create a click command that handles data collection

    Returns
    -------
        A click CLI command that collects probe data

    """

    def make_command(f: Callable) -> Callable:
        for option in reversed(cls.get_collect_cli_options()):
            f = option(f)
        return click.command(name=cls.vendor.name.lower(), help=cls.collect_help_str())(f)

    def collect_callback(
        ctx: click.Context,  # noqa: ARG001
        collect_config: CollectMixin.CollectConfig,
    ) -> None:
        """Load probe data from file or directory."""
        try:
            cls._collect_and_save(collect_config)

        except Exception as e:
            logger.exception(f"Error: {e!s}")
            raise click.Abort(f"Error: {e!s}") from e

    return make_command(collect_callback)

get_collect_cli_options() classmethod

Return the click options/arguments for collecting probe data.

Source code in opensampl/mixins/collect.py
83
84
85
86
87
88
89
@classmethod
def get_collect_cli_options(cls) -> list[Callable]:
    """Return the click options/arguments for collecting probe data."""
    return [
        from_pydantic(cls.CollectConfig),
        click.pass_context,
    ]

load_metadata(probe_key, metadata) classmethod

Load provided metadata associated with given probe_key

Distinct from BaseProbe.parse_metadata because it is a class method without access to self.input_file

Source code in opensampl/mixins/collect.py
149
150
151
152
153
154
155
156
@classmethod
def load_metadata(cls, probe_key: ProbeKey, metadata: dict) -> None:
    """
    Load provided metadata associated with given probe_key

    Distinct from BaseProbe.parse_metadata because it is a class method without access to self.input_file
    """
    load_probe_metadata(vendor=cls.vendor, probe_key=probe_key, data=metadata)