Skip to content

opensampl.vendors.microchip.twst

Microchip TWST clock Parser implementation

MicrochipTWSTProbe

Bases: BaseProbe

MicrochipTWST Probe Object

Source code in opensampl/vendors/microchip/twst.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
class MicrochipTWSTProbe(BaseProbe):
    """MicrochipTWST Probe Object"""

    vendor = VENDORS.MICROCHIP_TWST
    MEASUREMENTS: ClassVar = {"meas:offset": METRICS.PHASE_OFFSET, "tracking:ebno": METRICS.EB_NO}

    class RandomDataConfig(BaseProbe.RandomDataConfig):
        """Model for storing random data generation configurations as provided by CLI or YAML"""

        # Time series parameters
        base_value: Optional[float] = Field(
            default_factory=lambda: random.uniform(-1e-8, 1e-8), description="random.uniform(-1e-8, 1e-8)"
        )
        noise_amplitude: Optional[float] = Field(
            default_factory=lambda: random.uniform(1e-10, 1e-9), description="random.uniform(1e-10, 1e-9)"
        )
        drift_rate: Optional[float] = Field(
            default_factory=lambda: random.uniform(-1e-12, 1e-12), description="random.uniform(-1e-12, 1e-12)"
        )

        ebno_base_value: Optional[float] = Field(
            default_factory=lambda: random.uniform(10.0, 20.0), description="random.uniform(10.0, 20.0)"
        )
        ebno_noise_amplitude: Optional[float] = Field(
            default_factory=lambda: random.uniform(0.5, 2.0), description="random.uniform(0.5, 2.0)"
        )
        ebno_drift_rate: Optional[float] = Field(
            default_factory=lambda: random.uniform(-0.01, 0.01), description="random.uniform(-0.01, 0.01)"
        )

        num_channels: int = Field(4)

        probe_id: str = "modem"

        def generate_ebno_time_series(self):
            """Given the settings of this particular RandomDataConfig, generate random Eb/No Data"""
            total_seconds = self.duration_hours * 3600
            num_samples = int(total_seconds / self.sample_interval)

            time_points = []
            values = []
            for i in range(num_samples):
                sample_time = self.start_time + timedelta(seconds=i * self.sample_interval)
                time_points.append(sample_time)

                # Generate value with drift and noise
                time_offset = i * self.sample_interval
                drift_component = self.ebno_drift_rate * time_offset
                noise_component = np.random.normal(0, self.ebno_noise_amplitude)
                value = self.ebno_base_value + drift_component + noise_component

                # Add occasional outliers for realism
                if random.random() < self.outlier_probability:
                    value += np.random.normal(0, self.ebno_noise_amplitude * self.outlier_multiplier)

                values.append(value)

            return pd.DataFrame({"time": time_points, "value": values})

    @classmethod
    def get_random_data_cli_options(cls) -> list:
        """Return vendor-specific random data generation options."""
        base_options = super().get_random_data_cli_options()
        vendor_options = [
            click.option(
                "--num-channels",
                type=int,
                help=(
                    f"Number of remote channels to generate data for "
                    f"(default: {cls.RandomDataConfig.model_fields.get('num_channels').default})"
                ),
            ),
            click.option(
                "--ebno-base-value",
                type=float,
                help=(
                    f"Base value for Eb/No measurements "
                    f"(default = {cls.RandomDataConfig.model_fields.get('base_value').description!s})"
                ),
            ),
            click.option(
                "--ebno-noise-amplitude",
                type=float,
                help=(
                    f"Noise amplitude/standard deviation for Eb/No measurements "
                    f"(default = {cls.RandomDataConfig.model_fields.get('noise_amplitude').description!s})"
                ),
            ),
            click.option(
                "--ebno-drift-rate",
                type=float,
                help=(
                    f"Linear drift rate per second for Eb/No measurements "
                    f"(default = {cls.RandomDataConfig.model_fields.get('drift_rate').description!s})"
                ),
            ),
        ]
        return vendor_options + base_options

    def __init__(self, input_file: Union[str, Path]):
        """Initialize MicrochipTWST object give input_file and determines probe identity from filename"""
        super().__init__(input_file=input_file)
        self.header = self.get_header()
        self.probe_key = ProbeKey(probe_id="modem", ip_address=self.header["local"]["ip"])

    def process_time_data(self) -> None:
        """Process time series data from the input file."""
        df = pd.read_csv(
            self.input_file,
            comment="#",
        )
        measurement_suffix = "|".join(map(re.escape, self.MEASUREMENTS.keys()))
        pattern = rf"chan:\d+:{measurement_suffix}$"
        df_mask = df["reading"].str.contains(pattern)
        included_rows = df_mask.sum()
        excluded_rows = len(df) - included_rows
        logger.info(f"Included {included_rows}/{len(df)} rows, Excluded {excluded_rows}/{len(df)} rows")
        df = df[df_mask].copy()

        df.rename(columns={"timestamp": "time"}, inplace=True)
        df["channel"] = df["reading"].str.extract(r"chan:(\d+)").astype(int)
        df["measurement"] = df["reading"].str.extract(r"chan:\d+:(.*)")

        grouped_dfs = {
            (chan, meas): group.reset_index(drop=True)
            for (chan, meas), group in df.groupby(["channel", "measurement"])  # ty: ignore[not-iterable]
        }

        for key, df in grouped_dfs.items():
            logger.debug(f"Loading: {key}")
            channel, measurement = key
            compound_key = {"ip_address": self.probe_key.ip_address, "probe_id": f"chan:{channel}"}

            metric = self.MEASUREMENTS.get(measurement)
            if not metric:
                raise ValueError(f"Unknown metrics type {measurement}")

            try:
                self.send_data(data=df, metric=metric, reference_type=REF_TYPES.PROBE, compound_reference=compound_key)
            except requests.HTTPError as e:
                resp = e.response
                if resp is None:
                    raise
                status_code = resp.status_code
                if status_code == 409:
                    logger.info(f"(chan, meas)={key} already loaded for time frame, continuing..")
                    continue
                raise
            except IntegrityError as e:
                if isinstance(e.orig, psycopg2.errors.UniqueViolation):  # ty: ignore[unresolved-attribute]
                    logger.info(f"Chan: meas={key} already loaded for time frame, continuing..")

    def get_header(self) -> dict:
        """Retrieve the yaml formatted header information from the input file loaded into a dict"""
        header_lines = []
        with self.input_file.open() as f:
            for line in f:
                if line.startswith("#"):
                    header_lines.append(line[2:])
                else:
                    break

        header_str = "".join(header_lines)
        return yaml.safe_load(header_str)

    def process_metadata(self) -> dict:
        """
        Process metadata from the input file.

        Returns:
            dict: Dictionary mapping table names to ORM objects

        """
        for chan, info in self.header.get("remotes").items():
            # TODO: we will have to make sure channel 1 is the same probe somehow
            remote_probe_key = ProbeKey(ip_address=self.probe_key.ip_address, probe_id=f"chan:{chan}")
            load_probe_metadata(
                vendor=self.vendor, probe_key=remote_probe_key, data={"additional_metadata": info, "model": "ATS 6502"}
            )
        modem_data = self.header.get("local")
        self.metadata_parsed = True
        return {"additional_metadata": modem_data, "model": "ATS 6502"}

    @classmethod
    def _generate_random_probe_key(cls, gen_config: RandomDataConfig, probe_index: int) -> ProbeKey:
        if gen_config.probe_ip is not None:
            ip_address = cls._generate_random_ip()
        elif probe_index > 0:
            ip_address = f"{gen_config.probe_ip}.{probe_index}"
        else:
            ip_address = str(gen_config.probe_ip)

        return ProbeKey(probe_id="modem", ip_address=ip_address)

    @classmethod
    def generate_random_data(
        cls,
        config: RandomDataConfig,
        probe_key: ProbeKey,
    ) -> ProbeKey:
        """
        Generate random TWST modem test data and send it directly to the database.

        Args:
            probe_key: Probe key to use (generated if None)
            config: RandomDataConfig with parameters specifying how to generate data

        Returns:
            ProbeKey: The probe key used for the generated data

        """
        cls._setup_random_seed(config.seed)

        logger.info(f"Generating random TWST data for {probe_key}")

        # Generate and send metadata for main modem
        main_metadata = {
            "additional_metadata": {
                "sid": f"STATION_{random.choice('ABCDEFGH')}",
                "prn": random.randint(100, 999),
                "ip": probe_key.ip_address,
                "test_data": True,
                "random_generation_config": config.model_dump(),
            },
            "model": "ATS 6502",
        }

        cls._send_metadata_to_db(probe_key, main_metadata)

        # Generate and send metadata for remote channels
        for channel in range(1, config.num_channels + 1):
            remote_probe_key = ProbeKey(ip_address=probe_key.ip_address, probe_id=f"chan:{channel}")
            remote_metadata = {
                "additional_metadata": {
                    "rx_channel": f"ch{channel}",
                    "sid": f"STATION_{random.choice('ABCDEFGH')}",
                    "prn": random.randint(100, 999),
                    "test_data": True,
                    "random_generation_config": config.model_dump(),
                },
                "model": "ATS 6502",
            }
            cls._send_metadata_to_db(remote_probe_key, remote_metadata)

            for measurement, metric in cls.MEASUREMENTS.items():
                logger.debug(f"Generating data for channel {channel}, measurement {measurement}")

                # Generate time series using base class helper
                df = (
                    config.generate_time_series()
                    if measurement == "meas:offset"
                    else config.generate_ebno_time_series()
                )

                # Send data with compound reference for the channel
                compound_key = {"ip_address": probe_key.ip_address, "probe_id": f"chan:{channel}"}
                cls.send_data(
                    probe_key=probe_key,
                    data=df,
                    metric=metric,
                    reference_type=REF_TYPES.PROBE,
                    compound_reference=compound_key,
                )

        logger.info(f"Successfully generated {config.duration_hours}h of random TWST data for {probe_key}")
        return probe_key

RandomDataConfig

Bases: RandomDataConfig

Model for storing random data generation configurations as provided by CLI or YAML

Source code in opensampl/vendors/microchip/twst.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class RandomDataConfig(BaseProbe.RandomDataConfig):
    """Model for storing random data generation configurations as provided by CLI or YAML"""

    # Time series parameters
    base_value: Optional[float] = Field(
        default_factory=lambda: random.uniform(-1e-8, 1e-8), description="random.uniform(-1e-8, 1e-8)"
    )
    noise_amplitude: Optional[float] = Field(
        default_factory=lambda: random.uniform(1e-10, 1e-9), description="random.uniform(1e-10, 1e-9)"
    )
    drift_rate: Optional[float] = Field(
        default_factory=lambda: random.uniform(-1e-12, 1e-12), description="random.uniform(-1e-12, 1e-12)"
    )

    ebno_base_value: Optional[float] = Field(
        default_factory=lambda: random.uniform(10.0, 20.0), description="random.uniform(10.0, 20.0)"
    )
    ebno_noise_amplitude: Optional[float] = Field(
        default_factory=lambda: random.uniform(0.5, 2.0), description="random.uniform(0.5, 2.0)"
    )
    ebno_drift_rate: Optional[float] = Field(
        default_factory=lambda: random.uniform(-0.01, 0.01), description="random.uniform(-0.01, 0.01)"
    )

    num_channels: int = Field(4)

    probe_id: str = "modem"

    def generate_ebno_time_series(self):
        """Given the settings of this particular RandomDataConfig, generate random Eb/No Data"""
        total_seconds = self.duration_hours * 3600
        num_samples = int(total_seconds / self.sample_interval)

        time_points = []
        values = []
        for i in range(num_samples):
            sample_time = self.start_time + timedelta(seconds=i * self.sample_interval)
            time_points.append(sample_time)

            # Generate value with drift and noise
            time_offset = i * self.sample_interval
            drift_component = self.ebno_drift_rate * time_offset
            noise_component = np.random.normal(0, self.ebno_noise_amplitude)
            value = self.ebno_base_value + drift_component + noise_component

            # Add occasional outliers for realism
            if random.random() < self.outlier_probability:
                value += np.random.normal(0, self.ebno_noise_amplitude * self.outlier_multiplier)

            values.append(value)

        return pd.DataFrame({"time": time_points, "value": values})

generate_ebno_time_series()

Given the settings of this particular RandomDataConfig, generate random Eb/No Data

Source code in opensampl/vendors/microchip/twst.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def generate_ebno_time_series(self):
    """Given the settings of this particular RandomDataConfig, generate random Eb/No Data"""
    total_seconds = self.duration_hours * 3600
    num_samples = int(total_seconds / self.sample_interval)

    time_points = []
    values = []
    for i in range(num_samples):
        sample_time = self.start_time + timedelta(seconds=i * self.sample_interval)
        time_points.append(sample_time)

        # Generate value with drift and noise
        time_offset = i * self.sample_interval
        drift_component = self.ebno_drift_rate * time_offset
        noise_component = np.random.normal(0, self.ebno_noise_amplitude)
        value = self.ebno_base_value + drift_component + noise_component

        # Add occasional outliers for realism
        if random.random() < self.outlier_probability:
            value += np.random.normal(0, self.ebno_noise_amplitude * self.outlier_multiplier)

        values.append(value)

    return pd.DataFrame({"time": time_points, "value": values})

__init__(input_file)

Initialize MicrochipTWST object give input_file and determines probe identity from filename

Source code in opensampl/vendors/microchip/twst.py
125
126
127
128
129
def __init__(self, input_file: Union[str, Path]):
    """Initialize MicrochipTWST object give input_file and determines probe identity from filename"""
    super().__init__(input_file=input_file)
    self.header = self.get_header()
    self.probe_key = ProbeKey(probe_id="modem", ip_address=self.header["local"]["ip"])

generate_random_data(config, probe_key) classmethod

Generate random TWST modem test data and send it directly to the database.

Parameters:

Name Type Description Default
probe_key ProbeKey

Probe key to use (generated if None)

required
config RandomDataConfig

RandomDataConfig with parameters specifying how to generate data

required

Returns:

Name Type Description
ProbeKey ProbeKey

The probe key used for the generated data

Source code in opensampl/vendors/microchip/twst.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@classmethod
def generate_random_data(
    cls,
    config: RandomDataConfig,
    probe_key: ProbeKey,
) -> ProbeKey:
    """
    Generate random TWST modem test data and send it directly to the database.

    Args:
        probe_key: Probe key to use (generated if None)
        config: RandomDataConfig with parameters specifying how to generate data

    Returns:
        ProbeKey: The probe key used for the generated data

    """
    cls._setup_random_seed(config.seed)

    logger.info(f"Generating random TWST data for {probe_key}")

    # Generate and send metadata for main modem
    main_metadata = {
        "additional_metadata": {
            "sid": f"STATION_{random.choice('ABCDEFGH')}",
            "prn": random.randint(100, 999),
            "ip": probe_key.ip_address,
            "test_data": True,
            "random_generation_config": config.model_dump(),
        },
        "model": "ATS 6502",
    }

    cls._send_metadata_to_db(probe_key, main_metadata)

    # Generate and send metadata for remote channels
    for channel in range(1, config.num_channels + 1):
        remote_probe_key = ProbeKey(ip_address=probe_key.ip_address, probe_id=f"chan:{channel}")
        remote_metadata = {
            "additional_metadata": {
                "rx_channel": f"ch{channel}",
                "sid": f"STATION_{random.choice('ABCDEFGH')}",
                "prn": random.randint(100, 999),
                "test_data": True,
                "random_generation_config": config.model_dump(),
            },
            "model": "ATS 6502",
        }
        cls._send_metadata_to_db(remote_probe_key, remote_metadata)

        for measurement, metric in cls.MEASUREMENTS.items():
            logger.debug(f"Generating data for channel {channel}, measurement {measurement}")

            # Generate time series using base class helper
            df = (
                config.generate_time_series()
                if measurement == "meas:offset"
                else config.generate_ebno_time_series()
            )

            # Send data with compound reference for the channel
            compound_key = {"ip_address": probe_key.ip_address, "probe_id": f"chan:{channel}"}
            cls.send_data(
                probe_key=probe_key,
                data=df,
                metric=metric,
                reference_type=REF_TYPES.PROBE,
                compound_reference=compound_key,
            )

    logger.info(f"Successfully generated {config.duration_hours}h of random TWST data for {probe_key}")
    return probe_key

get_header()

Retrieve the yaml formatted header information from the input file loaded into a dict

Source code in opensampl/vendors/microchip/twst.py
178
179
180
181
182
183
184
185
186
187
188
189
def get_header(self) -> dict:
    """Retrieve the yaml formatted header information from the input file loaded into a dict"""
    header_lines = []
    with self.input_file.open() as f:
        for line in f:
            if line.startswith("#"):
                header_lines.append(line[2:])
            else:
                break

    header_str = "".join(header_lines)
    return yaml.safe_load(header_str)

get_random_data_cli_options() classmethod

Return vendor-specific random data generation options.

Source code in opensampl/vendors/microchip/twst.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
@classmethod
def get_random_data_cli_options(cls) -> list:
    """Return vendor-specific random data generation options."""
    base_options = super().get_random_data_cli_options()
    vendor_options = [
        click.option(
            "--num-channels",
            type=int,
            help=(
                f"Number of remote channels to generate data for "
                f"(default: {cls.RandomDataConfig.model_fields.get('num_channels').default})"
            ),
        ),
        click.option(
            "--ebno-base-value",
            type=float,
            help=(
                f"Base value for Eb/No measurements "
                f"(default = {cls.RandomDataConfig.model_fields.get('base_value').description!s})"
            ),
        ),
        click.option(
            "--ebno-noise-amplitude",
            type=float,
            help=(
                f"Noise amplitude/standard deviation for Eb/No measurements "
                f"(default = {cls.RandomDataConfig.model_fields.get('noise_amplitude').description!s})"
            ),
        ),
        click.option(
            "--ebno-drift-rate",
            type=float,
            help=(
                f"Linear drift rate per second for Eb/No measurements "
                f"(default = {cls.RandomDataConfig.model_fields.get('drift_rate').description!s})"
            ),
        ),
    ]
    return vendor_options + base_options

process_metadata()

Process metadata from the input file.

Returns:

Name Type Description
dict dict

Dictionary mapping table names to ORM objects

Source code in opensampl/vendors/microchip/twst.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def process_metadata(self) -> dict:
    """
    Process metadata from the input file.

    Returns:
        dict: Dictionary mapping table names to ORM objects

    """
    for chan, info in self.header.get("remotes").items():
        # TODO: we will have to make sure channel 1 is the same probe somehow
        remote_probe_key = ProbeKey(ip_address=self.probe_key.ip_address, probe_id=f"chan:{chan}")
        load_probe_metadata(
            vendor=self.vendor, probe_key=remote_probe_key, data={"additional_metadata": info, "model": "ATS 6502"}
        )
    modem_data = self.header.get("local")
    self.metadata_parsed = True
    return {"additional_metadata": modem_data, "model": "ATS 6502"}

process_time_data()

Process time series data from the input file.

Source code in opensampl/vendors/microchip/twst.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def process_time_data(self) -> None:
    """Process time series data from the input file."""
    df = pd.read_csv(
        self.input_file,
        comment="#",
    )
    measurement_suffix = "|".join(map(re.escape, self.MEASUREMENTS.keys()))
    pattern = rf"chan:\d+:{measurement_suffix}$"
    df_mask = df["reading"].str.contains(pattern)
    included_rows = df_mask.sum()
    excluded_rows = len(df) - included_rows
    logger.info(f"Included {included_rows}/{len(df)} rows, Excluded {excluded_rows}/{len(df)} rows")
    df = df[df_mask].copy()

    df.rename(columns={"timestamp": "time"}, inplace=True)
    df["channel"] = df["reading"].str.extract(r"chan:(\d+)").astype(int)
    df["measurement"] = df["reading"].str.extract(r"chan:\d+:(.*)")

    grouped_dfs = {
        (chan, meas): group.reset_index(drop=True)
        for (chan, meas), group in df.groupby(["channel", "measurement"])  # ty: ignore[not-iterable]
    }

    for key, df in grouped_dfs.items():
        logger.debug(f"Loading: {key}")
        channel, measurement = key
        compound_key = {"ip_address": self.probe_key.ip_address, "probe_id": f"chan:{channel}"}

        metric = self.MEASUREMENTS.get(measurement)
        if not metric:
            raise ValueError(f"Unknown metrics type {measurement}")

        try:
            self.send_data(data=df, metric=metric, reference_type=REF_TYPES.PROBE, compound_reference=compound_key)
        except requests.HTTPError as e:
            resp = e.response
            if resp is None:
                raise
            status_code = resp.status_code
            if status_code == 409:
                logger.info(f"(chan, meas)={key} already loaded for time frame, continuing..")
                continue
            raise
        except IntegrityError as e:
            if isinstance(e.orig, psycopg2.errors.UniqueViolation):  # ty: ignore[unresolved-attribute]
                logger.info(f"Chan: meas={key} already loaded for time frame, continuing..")