Skip to content

opensampl.collect.microchip.twst.readings

Status and readings collector for Microchip TWST ATS6502 modems.

This module provides functionality to collect status readings and measurements from ATS6502 modems over time.

ModemStatusReader

Bases: ModemReader

Reader for collecting status readings from ATS6502 modems.

Provides functionality to connect to an ATS6502 modem and collect status readings over a specified duration.

Source code in opensampl/collect/microchip/twst/readings.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class ModemStatusReader(ModemReader):
    """
    Reader for collecting status readings from ATS6502 modems.

    Provides functionality to connect to an ATS6502 modem and collect
    status readings over a specified duration.
    """

    def __init__(self, host: str, duration: int = 60, keys: Optional[list[str]] = None, port: int = 1900):
        """
        Initialize ModemStatusReader.

        Args:
            host: IP address or hostname of the ATS6502 modem.
            duration: Duration in seconds to collect readings.
            keys: List of key suffixes to filter readings (default: None for all readings).
            port: what port to connect to for status readings (default 1900).

        """
        self.duration = duration
        self.keys = keys
        self.queue = asyncio.Queue()
        self.readings = []
        super().__init__(host=host, port=port)

    @require_conn
    async def reader_task(self):
        """
        Task to continuously read lines from the modem.

        Reads lines from the telnet connection and queues them for processing.
        """
        while True:
            line = await self.reader.readline()
            if line:
                await self.queue.put(line)

    def parse_line(self, line: str):
        """
        Parse a line of modem output.

        Args:
            line: Raw line from modem output.

        Returns:
            Tuple of (timestamp, definition, value) if parsing succeeds, None otherwise.

        """
        try:
            timestamp, reading = line.strip().split(" ", 1)
            definition, value = reading.split("=")
        except ValueError:
            return None
        else:
            return timestamp, definition, value

    def should_keep(self, definition: str) -> bool:
        """
        Determine if a reading should be kept based on its definition.

        Args:
            definition: Reading definition string.

        Returns:
            True if the reading matches any of the configured key suffixes

        """
        if self.keys is None:
            return True  # Collect all readings when no keys specified
        return any(definition.endswith(suffix) for suffix in self.keys)

    async def processor_task(self):
        """
        Task to process queued lines and filter readings.

        Processes lines from the queue, parses them, and stores relevant readings.
        """
        while True:
            line = await self.queue.get()
            parsed = self.parse_line(line)
            if parsed:
                timestamp, definition, value = parsed
                if self.should_keep(definition):
                    self.readings.append(parsed)

    async def collect_readings(self):
        """
        Collect readings from the modem for the specified duration.

        Starts reader and processor tasks, collects data for the configured
        duration, then cancels the tasks.
        """
        async with self.connect():
            read_coroutine = asyncio.create_task(self.reader_task())
            process_coroutine = asyncio.create_task(self.processor_task())

            try:
                await asyncio.sleep(self.duration)
            finally:
                # Cancel tasks and wait for them to complete
                read_coroutine.cancel()
                process_coroutine.cancel()

                # Wait for tasks to handle cancellation
                await asyncio.gather(read_coroutine, process_coroutine, return_exceptions=True)

__init__(host, duration=60, keys=None, port=1900)

Initialize ModemStatusReader.

Parameters:

Name Type Description Default
host str

IP address or hostname of the ATS6502 modem.

required
duration int

Duration in seconds to collect readings.

60
keys Optional[list[str]]

List of key suffixes to filter readings (default: None for all readings).

None
port int

what port to connect to for status readings (default 1900).

1900
Source code in opensampl/collect/microchip/twst/readings.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, host: str, duration: int = 60, keys: Optional[list[str]] = None, port: int = 1900):
    """
    Initialize ModemStatusReader.

    Args:
        host: IP address or hostname of the ATS6502 modem.
        duration: Duration in seconds to collect readings.
        keys: List of key suffixes to filter readings (default: None for all readings).
        port: what port to connect to for status readings (default 1900).

    """
    self.duration = duration
    self.keys = keys
    self.queue = asyncio.Queue()
    self.readings = []
    super().__init__(host=host, port=port)

collect_readings() async

Collect readings from the modem for the specified duration.

Starts reader and processor tasks, collects data for the configured duration, then cancels the tasks.

Source code in opensampl/collect/microchip/twst/readings.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
async def collect_readings(self):
    """
    Collect readings from the modem for the specified duration.

    Starts reader and processor tasks, collects data for the configured
    duration, then cancels the tasks.
    """
    async with self.connect():
        read_coroutine = asyncio.create_task(self.reader_task())
        process_coroutine = asyncio.create_task(self.processor_task())

        try:
            await asyncio.sleep(self.duration)
        finally:
            # Cancel tasks and wait for them to complete
            read_coroutine.cancel()
            process_coroutine.cancel()

            # Wait for tasks to handle cancellation
            await asyncio.gather(read_coroutine, process_coroutine, return_exceptions=True)

parse_line(line)

Parse a line of modem output.

Parameters:

Name Type Description Default
line str

Raw line from modem output.

required

Returns:

Type Description

Tuple of (timestamp, definition, value) if parsing succeeds, None otherwise.

Source code in opensampl/collect/microchip/twst/readings.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def parse_line(self, line: str):
    """
    Parse a line of modem output.

    Args:
        line: Raw line from modem output.

    Returns:
        Tuple of (timestamp, definition, value) if parsing succeeds, None otherwise.

    """
    try:
        timestamp, reading = line.strip().split(" ", 1)
        definition, value = reading.split("=")
    except ValueError:
        return None
    else:
        return timestamp, definition, value

processor_task() async

Task to process queued lines and filter readings.

Processes lines from the queue, parses them, and stores relevant readings.

Source code in opensampl/collect/microchip/twst/readings.py
85
86
87
88
89
90
91
92
93
94
95
96
97
async def processor_task(self):
    """
    Task to process queued lines and filter readings.

    Processes lines from the queue, parses them, and stores relevant readings.
    """
    while True:
        line = await self.queue.get()
        parsed = self.parse_line(line)
        if parsed:
            timestamp, definition, value = parsed
            if self.should_keep(definition):
                self.readings.append(parsed)

reader_task() async

Task to continuously read lines from the modem.

Reads lines from the telnet connection and queues them for processing.

Source code in opensampl/collect/microchip/twst/readings.py
39
40
41
42
43
44
45
46
47
48
49
@require_conn
async def reader_task(self):
    """
    Task to continuously read lines from the modem.

    Reads lines from the telnet connection and queues them for processing.
    """
    while True:
        line = await self.reader.readline()
        if line:
            await self.queue.put(line)

should_keep(definition)

Determine if a reading should be kept based on its definition.

Parameters:

Name Type Description Default
definition str

Reading definition string.

required

Returns:

Type Description
bool

True if the reading matches any of the configured key suffixes

Source code in opensampl/collect/microchip/twst/readings.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def should_keep(self, definition: str) -> bool:
    """
    Determine if a reading should be kept based on its definition.

    Args:
        definition: Reading definition string.

    Returns:
        True if the reading matches any of the configured key suffixes

    """
    if self.keys is None:
        return True  # Collect all readings when no keys specified
    return any(definition.endswith(suffix) for suffix in self.keys)