Skip to content

opensampl.collect.microchip.tp4100.collect_4100

Collection script for Microchip TimeProvider® 4100 Devices

This tool utilizes the web interface that is accessible at the IP address of the device. See the user guide for how to configure access to the web interface.

TP4100Collector

Collector class for Microchip TimeProvider 4100 device data.

This class provides functionality to collect time and performance data from Microchip TP4100 devices via their web interface.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class TP4100Collector:
    """
    Collector class for Microchip TimeProvider 4100 device data.

    This class provides functionality to collect time and performance data from
    Microchip TP4100 devices via their web interface.
    """

    def __init__(
        self,
        host: str,
        port: int = 443,
        output_dir: str = "./output",
        duration: int = 600,
        channels: Optional[list[str]] = None,
        metrics: Optional[list[str]] = None,
        method: Literal["chart_data", "download_file"] = "chart_data",
        save_full_status: bool = False,
    ):
        """
        Initialize TP4100Collector.

        Args:
            host: IP address or hostname of the TP4100 device.
            port: Port number for HTTPS connection (default: 443).
            output_dir: Directory path where collected data will be saved.
            duration: Duration in seconds for data collection.
            channels: List of specific channels to collect data from.
            metrics: List of specific metrics to collect.
            method: Collection method - "chart_data" downloads chart data for
                   specified duration (data showing in chart on Status Page),
                   "download_file" downloads last 24 hours (same as "Save as").
            save_full_status: Whether to save full status information.

        """
        self.config = TP4100Config(HOST=host, PORT=port)
        self.session = requests.Session()
        self.session.verify = False
        self.duration = duration
        self.metrics = metrics
        self.channels = channels
        self.headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "X-Requested-With": "XMLHttpRequest",
        }
        self.save_full_status = save_full_status

        self.login()
        self.method = method

        self.start_time = datetime.now(UTC)
        self.output_dir = Path(output_dir).resolve()
        logger.info(f"Saving to {self.output_dir}")

    def login(self):
        """
        Authenticate with the TP4100 device web interface.

        Raises:
            Exception: If login fails or connection issues occur.

        """
        login_url = f"{self.config.url}/login"
        login_data = {"txtuname": self.config.USERNAME, "txtpasswd": self.config.PASSWORD, "action": "applylogin"}
        try:
            resp = self.session.post(login_url, data=login_data, headers=self.headers)
            resp.raise_for_status()
        except Exception as e:
            logger.exception(f"Error trying to login to {login_url}; {e}")
            raise

    def get_monitored_channels(self):
        """
        Retrieve list of currently monitored channels from the device.

        Returns:
            set: Set of channel names that are currently being monitored.

        Raises:
            Exception: If unable to retrieve channel information.

        """
        channel_url = f"{self.config.url}/channels_thresholdValue"
        data = {"tgrp": -1}
        try:
            resp = self.session.post(channel_url, data=data, headers=self.headers)
            resp.raise_for_status()

            channel_data = resp.json()

            if self.save_full_status:
                filename = self.get_filename(detail="channelStatus", extension=".json")
                new_file = self.output_dir / filename
                self.output_dir.mkdir(parents=True, exist_ok=True)
                new_file.write_text(resp.text)

            return {
                x.get("monitorChannelString")
                for x in channel_data
                if x.get("monitorChStatusString", "").lower() in ("monitoring", "ok")
            }
        except Exception as e:
            logger.exception(f"Error trying to get channel information from {channel_url}; {e}")
            raise

    def collect_readings(self):
        """
        Collect readings from configured channels and metrics.

        Determines which channels to monitor (either specified or all monitored),
        then collects data for each requested metric using the configured method.
        """
        monitored_channels = self.get_monitored_channels()

        channels = monitored_channels if self.channels is None else self.channels

        channels = {x.lower() for x in channels}

        readings_to_collect = []
        for mon_con in DEFAULT_MONITOR_CONFIG.all():
            for ch_id in mon_con.ids:
                if (
                    any(x.startswith(f"{mon_con.channel_name}-{ch_id}".lower()) for x in channels)
                    or mon_con.channel_name.lower() in channels
                ):
                    readings_to_collect.extend([(mon_con, ch_id, metric) for metric in mon_con.metrics])

        for request_tpl in readings_to_collect:
            mon_ch, ch_id, metr = request_tpl
            ch_name = mon_ch.channel_name
            if self.metrics is not None and metr.short_name not in self.metrics:
                logger.trace(f"Skipping metric: {ch_name}; {ch_id}; {metr.full_name}")
                continue

            logger.debug(f"Requesting metric: {ch_name}; {ch_id}; {metr.full_name}")

            if self.method == "chart_data":
                self.collect_chart_data(request_tpl)
            elif self.method == "download_file":
                self.download_files(request_tpl)

    def get_filename(self, detail: Optional[str] = None, extension: str = ".txt"):
        """
        Generate timestamped filename for probe connection

        Format (no detail): {host}_TP4100_{timestamp}.{extension}
        Format (with detail): {host}_TP4100_{detail}_{timestamp}.{extension}

        Args:
            detail: Optional string to include in filename.
            extension: File extension to use (default: '.txt').

        Returns:
            str: Generated filename with timestamp and optional metric info.

        """
        filename = f"{self.config.HOST}_TP4100"
        if detail is not None:
            filename += f"_{detail}"
        cleaned_ext = f".{extension.lstrip('.')}"
        filename += f"_{datetime.now(UTC).replace(tzinfo=None).isoformat()}{cleaned_ext}"
        return filename

    def collect_chart_data(
        self, request_key: tuple[MonitoringConfig, int, MetricInfo], download_dict: Optional[dict[str, Any]] = None
    ):
        """
        Collect chart data for a specific metric and channel.

        Requests chart data from the device's web interface for the specified
        duration and saves it as a CSV file with YAML metadata headers.

        Args:
            request_key: Tuple of (monitor_config, channel_id, metric).
            download_dict: Optional additional request data to further configure API call\

        Raises:
            Exception: If data collection or file writing fails.

        """

        def format_utc_second(tai_sec: str, utc_offset: str) -> datetime:
            return datetime.fromtimestamp(int(tai_sec) - int(utc_offset), UTC)

        mon_ch, ch_id, metr = request_key
        ch_name = mon_ch.channel_name

        request_data = {
            "metric": metr.short_name.lower(),
            "xRange": self.duration,
            "tStart": -1,
            "channelName": ch_name.lower(),
            "channelId": ch_id,
        }

        if download_dict is not None:
            request_data.update(download_dict)

        chart_data_url = f"{self.config.url}/get_chart_data"
        chart_resp = self.session.post(chart_data_url, data=request_data, headers=self.headers)
        try:
            chart_resp.raise_for_status()
        except Exception:
            logger.error(pformat(request_data))
            logger.exception(chart_resp.text)
            raise
        data = chart_resp.json()

        df = pd.DataFrame(data["chartData"])
        if len(df) > 0:
            df["timestamp"] = df.apply(lambda r: format_utc_second(r["X"], r["OFFSET"]), axis=1)
            df = df[["timestamp", "Y"]].rename(columns={"Y": "value"})
            data_start = df["timestamp"].min().isoformat()
        else:
            data_start = None

        headers = {
            "Title": "TP4100 Performance Monitor",
            "metric": metr.full_name,
            "host": self.config.HOST,
            "input": f"{ch_name}-{ch_id}",
            "start_time": data_start,
            "method": "chart_data",
        }
        headers.update({k: v for k, v in data.items() if k in ("alarm_thresh", "channelStatus", "reference")})

        logger.debug(f"Collected {len(df)} values starting at {data_start}")
        header_str = yaml.safe_dump(headers, sort_keys=False)
        header_str = textwrap.indent(header_str, prefix="# ")
        file_detail = f"{ch_name.lower()}-{ch_id}_{metr.short_name.lower()}"
        filename = self.get_filename(detail=file_detail, extension=".csv")

        new_file = self.output_dir / filename
        self.output_dir.mkdir(parents=True, exist_ok=True)

        with new_file.open("w") as f:
            f.write(header_str)

        df.to_csv(new_file, mode="a", index=False)

    def download_files(
        self, request_key: tuple[MonitoringConfig, int, MetricInfo], download_dict: Optional[dict[str, Any]] = None
    ):
        """
        Download data files directly from the device.

        Downloads data files (typically last 24 hours) directly from the device,
        similar to using "Save as" on the Status Page.

        Args:
            request_key: Tuple of (monitor_config, channel_id, metric).
            download_dict: Optional download configuration parameters.

        Raises:
            Exception: If download or file saving fails.

        """
        mon_ch, ch_id, metr = request_key
        payload = mon_ch.download_payload(which_id=ch_id, down_metric=metr, download=download_dict)
        url = f"{self.config.url}/{mon_ch.download_path}"
        logger.debug(yaml.safe_dump(payload, sort_keys=False))

        resp = self.session.post(url, data=payload, headers=self.headers)
        resp.raise_for_status()
        try:
            filename = resp.headers.get("content-disposition").split("attachment; filename=", maxsplit=1)
            filename = next((x for x in filename if x != ""), None)
            new_file = self.output_dir / filename
            timestamp = datetime.fromtimestamp(int(new_file.stem[-10:]), UTC)

            self.output_dir.mkdir(parents=True, exist_ok=True)

            headers = {
                "Start": timestamp.isoformat(),
                "host": self.config.HOST,
                "metric": metr.full_name,
                "method": "download_file",
            }

            file_content = resp.text.splitlines()
            while len(file_content) > 0 and file_content[0].startswith("#"):
                curline = file_content.pop(0).lstrip("#")
                key, val = curline.split(": ", maxsplit=1)
                if key.strip() == "Title":
                    title_val, rest = val.split("(", maxsplit=1)
                    headers[key.strip()] = title_val
                    metr_str, inner_info = rest.split("):", maxsplit=1)
                    headers.update(
                        {k.strip(): v for k, v in (x.split(" = ", maxsplit=1) for x in inner_info.split(", "))}
                    )
                else:
                    headers[key.strip()] = val
            header_str = yaml.safe_dump(headers, sort_keys=False)
            header_str = textwrap.indent(header_str, prefix="# ")
            with new_file.open("w") as f:
                f.write(header_str)
                f.writelines("\n".join(file_content))

        except Exception:
            file_detail = f"{mon_ch.channel_name.lower()}-{ch_id}_{metr.short_name.lower()}"
            filename = self.get_filename(detail=file_detail)
            new_file = self.output_dir / filename
            self.output_dir.mkdir(parents=True, exist_ok=True)
            new_file.write_bytes(resp.content)

__init__(host, port=443, output_dir='./output', duration=600, channels=None, metrics=None, method='chart_data', save_full_status=False)

Initialize TP4100Collector.

Parameters:

Name Type Description Default
host str

IP address or hostname of the TP4100 device.

required
port int

Port number for HTTPS connection (default: 443).

443
output_dir str

Directory path where collected data will be saved.

'./output'
duration int

Duration in seconds for data collection.

600
channels Optional[list[str]]

List of specific channels to collect data from.

None
metrics Optional[list[str]]

List of specific metrics to collect.

None
method Literal['chart_data', 'download_file']

Collection method - "chart_data" downloads chart data for specified duration (data showing in chart on Status Page), "download_file" downloads last 24 hours (same as "Save as").

'chart_data'
save_full_status bool

Whether to save full status information.

False
Source code in opensampl/collect/microchip/tp4100/collect_4100.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    host: str,
    port: int = 443,
    output_dir: str = "./output",
    duration: int = 600,
    channels: Optional[list[str]] = None,
    metrics: Optional[list[str]] = None,
    method: Literal["chart_data", "download_file"] = "chart_data",
    save_full_status: bool = False,
):
    """
    Initialize TP4100Collector.

    Args:
        host: IP address or hostname of the TP4100 device.
        port: Port number for HTTPS connection (default: 443).
        output_dir: Directory path where collected data will be saved.
        duration: Duration in seconds for data collection.
        channels: List of specific channels to collect data from.
        metrics: List of specific metrics to collect.
        method: Collection method - "chart_data" downloads chart data for
               specified duration (data showing in chart on Status Page),
               "download_file" downloads last 24 hours (same as "Save as").
        save_full_status: Whether to save full status information.

    """
    self.config = TP4100Config(HOST=host, PORT=port)
    self.session = requests.Session()
    self.session.verify = False
    self.duration = duration
    self.metrics = metrics
    self.channels = channels
    self.headers = {
        "Content-Type": "application/x-www-form-urlencoded",
        "X-Requested-With": "XMLHttpRequest",
    }
    self.save_full_status = save_full_status

    self.login()
    self.method = method

    self.start_time = datetime.now(UTC)
    self.output_dir = Path(output_dir).resolve()
    logger.info(f"Saving to {self.output_dir}")

collect_chart_data(request_key, download_dict=None)

Collect chart data for a specific metric and channel.

Requests chart data from the device's web interface for the specified duration and saves it as a CSV file with YAML metadata headers.

Parameters:

Name Type Description Default
request_key tuple[MonitoringConfig, int, MetricInfo]

Tuple of (monitor_config, channel_id, metric).

required
download_dict Optional[dict[str, Any]]

Optional additional request data to further configure API call

None

Raises: Exception: If data collection or file writing fails.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def collect_chart_data(
    self, request_key: tuple[MonitoringConfig, int, MetricInfo], download_dict: Optional[dict[str, Any]] = None
):
    """
    Collect chart data for a specific metric and channel.

    Requests chart data from the device's web interface for the specified
    duration and saves it as a CSV file with YAML metadata headers.

    Args:
        request_key: Tuple of (monitor_config, channel_id, metric).
        download_dict: Optional additional request data to further configure API call\

    Raises:
        Exception: If data collection or file writing fails.

    """

    def format_utc_second(tai_sec: str, utc_offset: str) -> datetime:
        return datetime.fromtimestamp(int(tai_sec) - int(utc_offset), UTC)

    mon_ch, ch_id, metr = request_key
    ch_name = mon_ch.channel_name

    request_data = {
        "metric": metr.short_name.lower(),
        "xRange": self.duration,
        "tStart": -1,
        "channelName": ch_name.lower(),
        "channelId": ch_id,
    }

    if download_dict is not None:
        request_data.update(download_dict)

    chart_data_url = f"{self.config.url}/get_chart_data"
    chart_resp = self.session.post(chart_data_url, data=request_data, headers=self.headers)
    try:
        chart_resp.raise_for_status()
    except Exception:
        logger.error(pformat(request_data))
        logger.exception(chart_resp.text)
        raise
    data = chart_resp.json()

    df = pd.DataFrame(data["chartData"])
    if len(df) > 0:
        df["timestamp"] = df.apply(lambda r: format_utc_second(r["X"], r["OFFSET"]), axis=1)
        df = df[["timestamp", "Y"]].rename(columns={"Y": "value"})
        data_start = df["timestamp"].min().isoformat()
    else:
        data_start = None

    headers = {
        "Title": "TP4100 Performance Monitor",
        "metric": metr.full_name,
        "host": self.config.HOST,
        "input": f"{ch_name}-{ch_id}",
        "start_time": data_start,
        "method": "chart_data",
    }
    headers.update({k: v for k, v in data.items() if k in ("alarm_thresh", "channelStatus", "reference")})

    logger.debug(f"Collected {len(df)} values starting at {data_start}")
    header_str = yaml.safe_dump(headers, sort_keys=False)
    header_str = textwrap.indent(header_str, prefix="# ")
    file_detail = f"{ch_name.lower()}-{ch_id}_{metr.short_name.lower()}"
    filename = self.get_filename(detail=file_detail, extension=".csv")

    new_file = self.output_dir / filename
    self.output_dir.mkdir(parents=True, exist_ok=True)

    with new_file.open("w") as f:
        f.write(header_str)

    df.to_csv(new_file, mode="a", index=False)

collect_readings()

Collect readings from configured channels and metrics.

Determines which channels to monitor (either specified or all monitored), then collects data for each requested metric using the configured method.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def collect_readings(self):
    """
    Collect readings from configured channels and metrics.

    Determines which channels to monitor (either specified or all monitored),
    then collects data for each requested metric using the configured method.
    """
    monitored_channels = self.get_monitored_channels()

    channels = monitored_channels if self.channels is None else self.channels

    channels = {x.lower() for x in channels}

    readings_to_collect = []
    for mon_con in DEFAULT_MONITOR_CONFIG.all():
        for ch_id in mon_con.ids:
            if (
                any(x.startswith(f"{mon_con.channel_name}-{ch_id}".lower()) for x in channels)
                or mon_con.channel_name.lower() in channels
            ):
                readings_to_collect.extend([(mon_con, ch_id, metric) for metric in mon_con.metrics])

    for request_tpl in readings_to_collect:
        mon_ch, ch_id, metr = request_tpl
        ch_name = mon_ch.channel_name
        if self.metrics is not None and metr.short_name not in self.metrics:
            logger.trace(f"Skipping metric: {ch_name}; {ch_id}; {metr.full_name}")
            continue

        logger.debug(f"Requesting metric: {ch_name}; {ch_id}; {metr.full_name}")

        if self.method == "chart_data":
            self.collect_chart_data(request_tpl)
        elif self.method == "download_file":
            self.download_files(request_tpl)

download_files(request_key, download_dict=None)

Download data files directly from the device.

Downloads data files (typically last 24 hours) directly from the device, similar to using "Save as" on the Status Page.

Parameters:

Name Type Description Default
request_key tuple[MonitoringConfig, int, MetricInfo]

Tuple of (monitor_config, channel_id, metric).

required
download_dict Optional[dict[str, Any]]

Optional download configuration parameters.

None

Raises:

Type Description
Exception

If download or file saving fails.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def download_files(
    self, request_key: tuple[MonitoringConfig, int, MetricInfo], download_dict: Optional[dict[str, Any]] = None
):
    """
    Download data files directly from the device.

    Downloads data files (typically last 24 hours) directly from the device,
    similar to using "Save as" on the Status Page.

    Args:
        request_key: Tuple of (monitor_config, channel_id, metric).
        download_dict: Optional download configuration parameters.

    Raises:
        Exception: If download or file saving fails.

    """
    mon_ch, ch_id, metr = request_key
    payload = mon_ch.download_payload(which_id=ch_id, down_metric=metr, download=download_dict)
    url = f"{self.config.url}/{mon_ch.download_path}"
    logger.debug(yaml.safe_dump(payload, sort_keys=False))

    resp = self.session.post(url, data=payload, headers=self.headers)
    resp.raise_for_status()
    try:
        filename = resp.headers.get("content-disposition").split("attachment; filename=", maxsplit=1)
        filename = next((x for x in filename if x != ""), None)
        new_file = self.output_dir / filename
        timestamp = datetime.fromtimestamp(int(new_file.stem[-10:]), UTC)

        self.output_dir.mkdir(parents=True, exist_ok=True)

        headers = {
            "Start": timestamp.isoformat(),
            "host": self.config.HOST,
            "metric": metr.full_name,
            "method": "download_file",
        }

        file_content = resp.text.splitlines()
        while len(file_content) > 0 and file_content[0].startswith("#"):
            curline = file_content.pop(0).lstrip("#")
            key, val = curline.split(": ", maxsplit=1)
            if key.strip() == "Title":
                title_val, rest = val.split("(", maxsplit=1)
                headers[key.strip()] = title_val
                metr_str, inner_info = rest.split("):", maxsplit=1)
                headers.update(
                    {k.strip(): v for k, v in (x.split(" = ", maxsplit=1) for x in inner_info.split(", "))}
                )
            else:
                headers[key.strip()] = val
        header_str = yaml.safe_dump(headers, sort_keys=False)
        header_str = textwrap.indent(header_str, prefix="# ")
        with new_file.open("w") as f:
            f.write(header_str)
            f.writelines("\n".join(file_content))

    except Exception:
        file_detail = f"{mon_ch.channel_name.lower()}-{ch_id}_{metr.short_name.lower()}"
        filename = self.get_filename(detail=file_detail)
        new_file = self.output_dir / filename
        self.output_dir.mkdir(parents=True, exist_ok=True)
        new_file.write_bytes(resp.content)

get_filename(detail=None, extension='.txt')

Generate timestamped filename for probe connection

Format (no detail): {host}TP4100{timestamp}.{extension} Format (with detail): {host}TP4100{detail}_{timestamp}.{extension}

Parameters:

Name Type Description Default
detail Optional[str]

Optional string to include in filename.

None
extension str

File extension to use (default: '.txt').

'.txt'

Returns:

Name Type Description
str

Generated filename with timestamp and optional metric info.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def get_filename(self, detail: Optional[str] = None, extension: str = ".txt"):
    """
    Generate timestamped filename for probe connection

    Format (no detail): {host}_TP4100_{timestamp}.{extension}
    Format (with detail): {host}_TP4100_{detail}_{timestamp}.{extension}

    Args:
        detail: Optional string to include in filename.
        extension: File extension to use (default: '.txt').

    Returns:
        str: Generated filename with timestamp and optional metric info.

    """
    filename = f"{self.config.HOST}_TP4100"
    if detail is not None:
        filename += f"_{detail}"
    cleaned_ext = f".{extension.lstrip('.')}"
    filename += f"_{datetime.now(UTC).replace(tzinfo=None).isoformat()}{cleaned_ext}"
    return filename

get_monitored_channels()

Retrieve list of currently monitored channels from the device.

Returns:

Name Type Description
set

Set of channel names that are currently being monitored.

Raises:

Type Description
Exception

If unable to retrieve channel information.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def get_monitored_channels(self):
    """
    Retrieve list of currently monitored channels from the device.

    Returns:
        set: Set of channel names that are currently being monitored.

    Raises:
        Exception: If unable to retrieve channel information.

    """
    channel_url = f"{self.config.url}/channels_thresholdValue"
    data = {"tgrp": -1}
    try:
        resp = self.session.post(channel_url, data=data, headers=self.headers)
        resp.raise_for_status()

        channel_data = resp.json()

        if self.save_full_status:
            filename = self.get_filename(detail="channelStatus", extension=".json")
            new_file = self.output_dir / filename
            self.output_dir.mkdir(parents=True, exist_ok=True)
            new_file.write_text(resp.text)

        return {
            x.get("monitorChannelString")
            for x in channel_data
            if x.get("monitorChStatusString", "").lower() in ("monitoring", "ok")
        }
    except Exception as e:
        logger.exception(f"Error trying to get channel information from {channel_url}; {e}")
        raise

login()

Authenticate with the TP4100 device web interface.

Raises:

Type Description
Exception

If login fails or connection issues occur.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def login(self):
    """
    Authenticate with the TP4100 device web interface.

    Raises:
        Exception: If login fails or connection issues occur.

    """
    login_url = f"{self.config.url}/login"
    login_data = {"txtuname": self.config.USERNAME, "txtpasswd": self.config.PASSWORD, "action": "applylogin"}
    try:
        resp = self.session.post(login_url, data=login_data, headers=self.headers)
        resp.raise_for_status()
    except Exception as e:
        logger.exception(f"Error trying to login to {login_url}; {e}")
        raise

main(host, port=443, output_dir='./output', duration=600, channels=None, metrics=None, method='chart_data', save_full_status=False)

Collect time data from Microchip TimeProvider 4100 devices.

This tool connects to TP4100 devices via their web interface and collects performance metrics and time data.

Source code in opensampl/collect/microchip/tp4100/collect_4100.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def main(
    host: str,
    port: int = 443,
    output_dir: str = "./output",
    duration: int = 600,
    channels: Optional[list[str]] = None,
    metrics: Optional[list[str]] = None,
    method: Literal["chart_data", "download_file"] = "chart_data",
    save_full_status: bool = False,
):
    """
    Collect time data from Microchip TimeProvider 4100 devices.

    This tool connects to TP4100 devices via their web interface and collects
    performance metrics and time data.
    """
    collector = TP4100Collector(
        host=host,
        port=port,
        output_dir=output_dir,
        duration=duration,
        channels=channels,
        metrics=metrics,
        method=method,
        save_full_status=save_full_status,
    )

    try:
        collector.collect_readings()
        logger.info("Data collection completed successfully")
    except Exception as e:
        logger.debug(f"{e}", exc_info=True)
        logger.error(f"Collection failed: {e}")
        raise
    finally:
        collector.session.close()