Skip to content

opensampl.vendors.adva

ADVA clock implementation

AdvaProbe

Bases: BaseProbe

ADVA Probe Object

Source code in opensampl/vendors/adva.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class AdvaProbe(BaseProbe):
    """ADVA Probe Object"""

    timestamp: datetime
    start_time: datetime
    vendor = VENDORS.ADVA

    class RandomDataConfig(BaseProbe.RandomDataConfig):
        """Model for storing random data generation configurations as provided by CLI or YAML"""

        # Time series parameters
        base_value: float = Field(
            default_factory=lambda: random.uniform(-1e-6, 1e-6), description="random.uniform(-1e-6, 1e-6)"
        )
        noise_amplitude: float = Field(
            default_factory=lambda: random.uniform(1e-9, 1e-8), description="random.uniform(1e-9, 1e-8)"
        )
        drift_rate: float = Field(
            default_factory=lambda: random.uniform(-1e-12, 1e-12), description="random.uniform(-1e-12, 1e-12)"
        )

    @classmethod
    def get_random_data_cli_options(cls) -> list:
        """Return vendor-specific random data generation options."""
        base_options = super().get_random_data_cli_options()
        vendor_options = [
            click.option(
                "--probe-id",
                type=str,
                help=(
                    "The probe_id you want the random data to show up under. "
                    "Randomly generated for each probe if left empty; incremented if multiple probes"
                ),
            ),
        ]
        return base_options + vendor_options

    def __init__(self, input_file: Union[str, Path]):
        """Initialize AdvaProbe object give input_file and determines probe identity from filename"""
        super().__init__(input_file=input_file)
        self.probe_key, self.timestamp = self.parse_file_name(self.input_file)

    @classmethod
    def parse_file_name(cls, file_name: Path) -> tuple[ProbeKey, datetime]:
        """
        Parse file name into identifying parts

        Expected format: <ip_address>CLOCK_PROBE-<probe_id>-YYYY-MM-DD-HH-MM-SS.txt.gz
        """
        pattern = (
            r"(?P<ip>\d+\.\d+\.\d+\.\d+)(?P<type>CLOCK_PROBE|PTP_CLOCK_PROBE)"
            r"-(?P<identifier>\d+-\d+)-"
            r"(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-"
            r"(?P<hour>\d+)-(?P<minute>\d+)-(?P<second>\d+)\.txt(?:\.gz)?"
        )
        match = re.match(pattern, file_name.name)
        if match:
            ip_address = match.group("ip")
            probe_id = match.group("identifier")
            timestamp = (
                f"{match.group('year')}-{match.group('month')}-{match.group('day')} "
                f"{match.group('hour')}:{match.group('minute')}:{match.group('second')}"
            )

            # Convert timestamp to datetime object
            timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").astimezone(tz=timezone.utc)

            return ProbeKey(probe_id=probe_id, ip_address=ip_address), timestamp
        raise ValueError(f"Could not parse file name {file_name} into probe key and timestamp for ADVA probe")

    def _open_file(self) -> Union[TextIO, gzip.GzipFile]:
        """Open the input file, handling both .txt and .txt.gz formats"""
        if self.input_file.name.endswith(".gz"):
            return gzip.open(self.input_file, "rt")
        return self.input_file.open()

    def process_time_data(self) -> None:
        """Process time data from ADVA probe files"""
        compression = "gzip" if self.input_file.name.endswith(".gz") else None

        df = pd.read_csv(
            self.input_file,
            compression=compression,
            header=None,
            comment="#",
            names=["time", "value"],
            dtype={"time": "float64", "value": "float64"},
            engine="python",
            sep=r",\s*",
        )
        if not self.metadata_parsed:
            # need to get the probe's start time from the metadata if we do not already have it
            self.process_metadata()

        base_time = pd.Timestamp(self.start_time)
        offsets = pd.to_timedelta(df["time"], unit="s")
        df["time"] = base_time + offsets

        df["value_str"] = df["value"].apply(lambda x: f"{x:.10e}")

        self.send_time_data(data=df, reference_type=REF_TYPES.GNSS)

    def process_metadata(self) -> dict:
        """Process metadata from ADVA probe files"""
        header_to_column = {
            "Adva Direction": "adva_direction",
            "Adva MTIE Mask": "adva_mtie_mask",
            "Adva Mask Margin": "adva_mask_margin",
            "Adva Probe": "adva_probe",
            "Adva Reference": "adva_reference",
            "Adva Reference Expected QL": "adva_reference_expected_ql",
            "Adva Source": "adva_source",
            "Adva Status": "adva_status",
            "Adva Version": "adva_version",
            "Frequency": "frequency",
            "Multiplier": "multiplier",
            "Start": "start",
            "TimeMultiplier": "timemultiplier",
            "Title": "title",
            "Type": "type",
        }
        headers = {}
        freeform_header = {}
        with self._open_file() as f:
            for line in f:
                if not line.startswith("#"):
                    break
                header = line.lstrip("#").strip()
                key, value = header.split(": ")
                if key in header_to_column:
                    headers[header_to_column.get(key)] = value
                else:
                    freeform_header[key] = value
        headers["additional_metadata"] = freeform_header
        self.start_time = datetime.strptime(headers["start"], "%Y/%m/%d %H:%M:%S").astimezone(tz=timezone.utc)
        self.metadata_parsed = True
        return headers

    @classmethod
    def generate_random_data(
        cls,
        config: RandomDataConfig,
        probe_key: ProbeKey,
    ) -> ProbeKey:
        """
        Generate random ADVA probe test data and send it directly to the database.

        Args:
            probe_key: Probe key to use (generated if None)
            config: RandomDataConfig with parameters specifying how to generate data

        Returns:
            ProbeKey: The probe key used for the generated data

        """
        cls._setup_random_seed(config.seed)

        logger.info(f"Generating random ADVA data for {probe_key}")

        # Generate and send metadata
        metadata = {
            "adva_source": "RANDOM GENERATION",
            "title": f"Test ADVA Clock Probe {probe_key.probe_id}",
            "type": "PHASE",
            "additional_metadata": {"test_data": True, "random_generation_config": config.model_dump()},
        }

        cls._send_metadata_to_db(probe_key, metadata)

        # Generate time series using base class helper
        df = config.generate_time_series()

        # Send data to database
        cls.send_data(
            probe_key=probe_key,
            data=df,
            metric=METRICS.PHASE_OFFSET,
            reference_type=REF_TYPES.GNSS,
        )

        logger.info(f"Successfully generated {config.duration_hours}h of random ADVA data for {probe_key}")
        return probe_key

RandomDataConfig

Bases: RandomDataConfig

Model for storing random data generation configurations as provided by CLI or YAML

Source code in opensampl/vendors/adva.py
28
29
30
31
32
33
34
35
36
37
38
39
40
class RandomDataConfig(BaseProbe.RandomDataConfig):
    """Model for storing random data generation configurations as provided by CLI or YAML"""

    # Time series parameters
    base_value: float = Field(
        default_factory=lambda: random.uniform(-1e-6, 1e-6), description="random.uniform(-1e-6, 1e-6)"
    )
    noise_amplitude: float = Field(
        default_factory=lambda: random.uniform(1e-9, 1e-8), description="random.uniform(1e-9, 1e-8)"
    )
    drift_rate: float = Field(
        default_factory=lambda: random.uniform(-1e-12, 1e-12), description="random.uniform(-1e-12, 1e-12)"
    )

__init__(input_file)

Initialize AdvaProbe object give input_file and determines probe identity from filename

Source code in opensampl/vendors/adva.py
58
59
60
61
def __init__(self, input_file: Union[str, Path]):
    """Initialize AdvaProbe object give input_file and determines probe identity from filename"""
    super().__init__(input_file=input_file)
    self.probe_key, self.timestamp = self.parse_file_name(self.input_file)

generate_random_data(config, probe_key) classmethod

Generate random ADVA probe test data and send it directly to the database.

Parameters:

Name Type Description Default
probe_key ProbeKey

Probe key to use (generated if None)

required
config RandomDataConfig

RandomDataConfig with parameters specifying how to generate data

required

Returns:

Name Type Description
ProbeKey ProbeKey

The probe key used for the generated data

Source code in opensampl/vendors/adva.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@classmethod
def generate_random_data(
    cls,
    config: RandomDataConfig,
    probe_key: ProbeKey,
) -> ProbeKey:
    """
    Generate random ADVA probe test data and send it directly to the database.

    Args:
        probe_key: Probe key to use (generated if None)
        config: RandomDataConfig with parameters specifying how to generate data

    Returns:
        ProbeKey: The probe key used for the generated data

    """
    cls._setup_random_seed(config.seed)

    logger.info(f"Generating random ADVA data for {probe_key}")

    # Generate and send metadata
    metadata = {
        "adva_source": "RANDOM GENERATION",
        "title": f"Test ADVA Clock Probe {probe_key.probe_id}",
        "type": "PHASE",
        "additional_metadata": {"test_data": True, "random_generation_config": config.model_dump()},
    }

    cls._send_metadata_to_db(probe_key, metadata)

    # Generate time series using base class helper
    df = config.generate_time_series()

    # Send data to database
    cls.send_data(
        probe_key=probe_key,
        data=df,
        metric=METRICS.PHASE_OFFSET,
        reference_type=REF_TYPES.GNSS,
    )

    logger.info(f"Successfully generated {config.duration_hours}h of random ADVA data for {probe_key}")
    return probe_key

get_random_data_cli_options() classmethod

Return vendor-specific random data generation options.

Source code in opensampl/vendors/adva.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def get_random_data_cli_options(cls) -> list:
    """Return vendor-specific random data generation options."""
    base_options = super().get_random_data_cli_options()
    vendor_options = [
        click.option(
            "--probe-id",
            type=str,
            help=(
                "The probe_id you want the random data to show up under. "
                "Randomly generated for each probe if left empty; incremented if multiple probes"
            ),
        ),
    ]
    return base_options + vendor_options

parse_file_name(file_name) classmethod

Parse file name into identifying parts

Expected format: CLOCK_PROBE--YYYY-MM-DD-HH-MM-SS.txt.gz

Source code in opensampl/vendors/adva.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def parse_file_name(cls, file_name: Path) -> tuple[ProbeKey, datetime]:
    """
    Parse file name into identifying parts

    Expected format: <ip_address>CLOCK_PROBE-<probe_id>-YYYY-MM-DD-HH-MM-SS.txt.gz
    """
    pattern = (
        r"(?P<ip>\d+\.\d+\.\d+\.\d+)(?P<type>CLOCK_PROBE|PTP_CLOCK_PROBE)"
        r"-(?P<identifier>\d+-\d+)-"
        r"(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-"
        r"(?P<hour>\d+)-(?P<minute>\d+)-(?P<second>\d+)\.txt(?:\.gz)?"
    )
    match = re.match(pattern, file_name.name)
    if match:
        ip_address = match.group("ip")
        probe_id = match.group("identifier")
        timestamp = (
            f"{match.group('year')}-{match.group('month')}-{match.group('day')} "
            f"{match.group('hour')}:{match.group('minute')}:{match.group('second')}"
        )

        # Convert timestamp to datetime object
        timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").astimezone(tz=timezone.utc)

        return ProbeKey(probe_id=probe_id, ip_address=ip_address), timestamp
    raise ValueError(f"Could not parse file name {file_name} into probe key and timestamp for ADVA probe")

process_metadata()

Process metadata from ADVA probe files

Source code in opensampl/vendors/adva.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def process_metadata(self) -> dict:
    """Process metadata from ADVA probe files"""
    header_to_column = {
        "Adva Direction": "adva_direction",
        "Adva MTIE Mask": "adva_mtie_mask",
        "Adva Mask Margin": "adva_mask_margin",
        "Adva Probe": "adva_probe",
        "Adva Reference": "adva_reference",
        "Adva Reference Expected QL": "adva_reference_expected_ql",
        "Adva Source": "adva_source",
        "Adva Status": "adva_status",
        "Adva Version": "adva_version",
        "Frequency": "frequency",
        "Multiplier": "multiplier",
        "Start": "start",
        "TimeMultiplier": "timemultiplier",
        "Title": "title",
        "Type": "type",
    }
    headers = {}
    freeform_header = {}
    with self._open_file() as f:
        for line in f:
            if not line.startswith("#"):
                break
            header = line.lstrip("#").strip()
            key, value = header.split(": ")
            if key in header_to_column:
                headers[header_to_column.get(key)] = value
            else:
                freeform_header[key] = value
    headers["additional_metadata"] = freeform_header
    self.start_time = datetime.strptime(headers["start"], "%Y/%m/%d %H:%M:%S").astimezone(tz=timezone.utc)
    self.metadata_parsed = True
    return headers

process_time_data()

Process time data from ADVA probe files

Source code in opensampl/vendors/adva.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def process_time_data(self) -> None:
    """Process time data from ADVA probe files"""
    compression = "gzip" if self.input_file.name.endswith(".gz") else None

    df = pd.read_csv(
        self.input_file,
        compression=compression,
        header=None,
        comment="#",
        names=["time", "value"],
        dtype={"time": "float64", "value": "float64"},
        engine="python",
        sep=r",\s*",
    )
    if not self.metadata_parsed:
        # need to get the probe's start time from the metadata if we do not already have it
        self.process_metadata()

    base_time = pd.Timestamp(self.start_time)
    offsets = pd.to_timedelta(df["time"], unit="s")
    df["time"] = base_time + offsets

    df["value_str"] = df["value"].apply(lambda x: f"{x:.10e}")

    self.send_time_data(data=df, reference_type=REF_TYPES.GNSS)