Skip to content

opensampl.collect.microchip.twst.readings

Status and readings collector for Microchip TWST ATS6502 modems.

This module provides functionality to collect status readings and measurements from ATS6502 modems over time.

ModemStatusReader

Bases: ModemReader

Reader for collecting status readings from ATS6502 modems.

Provides functionality to connect to an ATS6502 modem and collect status readings over a specified duration.

Source code in opensampl/collect/microchip/twst/readings.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ModemStatusReader(ModemReader):
    """
    Reader for collecting status readings from ATS6502 modems.

    Provides functionality to connect to an ATS6502 modem and collect
    status readings over a specified duration.
    """

    def __init__(self, host: str, duration: int = 60, keys: Optional[list[str]] = None, port: int = 1900):
        """
        Initialize ModemStatusReader.

        Args:
            host: IP address or hostname of the ATS6502 modem.
            duration: Duration in seconds to collect readings.
            keys: List of key suffixes to filter readings (default: None for all readings).
            port: what port to connect to for status readings (default 1900).

        """
        self.duration = duration
        self.keys = keys
        self.queue = asyncio.Queue()
        self.readings = []
        self.continue_reading = False
        super().__init__(host=host, port=port)

    @require_conn
    async def reader_task(self):
        """
        Task to continuously read lines from the modem.

        Reads lines from the telnet connection and queues them for processing.
        """
        try:
            while self.continue_reading:
                line = await asyncio.wait_for(self.reader.readline(), timeout=5.0)
                if not line:
                    break  # EOF
                await self.queue.put(line)
        except asyncio.TimeoutError:
            logger.debug(f"Timeout waiting for data from {self.host}:{self.port}")
        finally:
            await self.queue.put(SENTINEL)

    def parse_line(self, line: str):
        """
        Parse a line of modem output.

        Args:
            line: Raw line from modem output.

        Returns:
            Tuple of (timestamp, definition, value) if parsing succeeds, None otherwise.

        """
        try:
            timestamp, reading = line.strip().split(" ", 1)
            definition, value = reading.split("=")
        except ValueError:
            return None
        else:
            return timestamp, definition, value

    def should_keep(self, definition: str) -> bool:
        """
        Determine if a reading should be kept based on its definition.

        Args:
            definition: Reading definition string.

        Returns:
            True if the reading matches any of the configured key suffixes

        """
        if self.keys is None:
            return True  # Collect all readings when no keys specified
        return any(definition.endswith(suffix) for suffix in self.keys)

    async def processor_task(self):
        """
        Task to process queued lines and filter readings.

        Processes lines from the queue, parses them, and stores relevant readings.
        """
        while True:
            line = await self.queue.get()
            try:
                if line == SENTINEL:
                    break
                parsed = self.parse_line(line)
                if parsed:
                    timestamp, definition, value = parsed
                    if self.should_keep(definition):
                        self.readings.append(parsed)
            finally:
                self.queue.task_done()

    async def collect_readings(self):
        """
        Collect readings from the modem for the specified duration.

        Starts reader and processor tasks, collects data for the configured
        duration, then cancels the tasks.
        """
        async with self.connect():
            self.continue_reading = True
            read_coroutine = asyncio.create_task(self.reader_task())
            process_coroutine = asyncio.create_task(self.processor_task())

            try:
                await asyncio.sleep(self.duration)
                self.continue_reading = False
                await self.queue.join()

            finally:
                # Cancel tasks and wait for them to complete
                read_coroutine.cancel()
                process_coroutine.cancel()

                # Wait for tasks to handle cancellation
                await asyncio.gather(read_coroutine, process_coroutine, return_exceptions=True)

__init__(host, duration=60, keys=None, port=1900)

Initialize ModemStatusReader.

Parameters:

Name Type Description Default
host str

IP address or hostname of the ATS6502 modem.

required
duration int

Duration in seconds to collect readings.

60
keys Optional[list[str]]

List of key suffixes to filter readings (default: None for all readings).

None
port int

what port to connect to for status readings (default 1900).

1900
Source code in opensampl/collect/microchip/twst/readings.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, host: str, duration: int = 60, keys: Optional[list[str]] = None, port: int = 1900):
    """
    Initialize ModemStatusReader.

    Args:
        host: IP address or hostname of the ATS6502 modem.
        duration: Duration in seconds to collect readings.
        keys: List of key suffixes to filter readings (default: None for all readings).
        port: what port to connect to for status readings (default 1900).

    """
    self.duration = duration
    self.keys = keys
    self.queue = asyncio.Queue()
    self.readings = []
    self.continue_reading = False
    super().__init__(host=host, port=port)

collect_readings() async

Collect readings from the modem for the specified duration.

Starts reader and processor tasks, collects data for the configured duration, then cancels the tasks.

Source code in opensampl/collect/microchip/twst/readings.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def collect_readings(self):
    """
    Collect readings from the modem for the specified duration.

    Starts reader and processor tasks, collects data for the configured
    duration, then cancels the tasks.
    """
    async with self.connect():
        self.continue_reading = True
        read_coroutine = asyncio.create_task(self.reader_task())
        process_coroutine = asyncio.create_task(self.processor_task())

        try:
            await asyncio.sleep(self.duration)
            self.continue_reading = False
            await self.queue.join()

        finally:
            # Cancel tasks and wait for them to complete
            read_coroutine.cancel()
            process_coroutine.cancel()

            # Wait for tasks to handle cancellation
            await asyncio.gather(read_coroutine, process_coroutine, return_exceptions=True)

parse_line(line)

Parse a line of modem output.

Parameters:

Name Type Description Default
line str

Raw line from modem output.

required

Returns:

Type Description

Tuple of (timestamp, definition, value) if parsing succeeds, None otherwise.

Source code in opensampl/collect/microchip/twst/readings.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def parse_line(self, line: str):
    """
    Parse a line of modem output.

    Args:
        line: Raw line from modem output.

    Returns:
        Tuple of (timestamp, definition, value) if parsing succeeds, None otherwise.

    """
    try:
        timestamp, reading = line.strip().split(" ", 1)
        definition, value = reading.split("=")
    except ValueError:
        return None
    else:
        return timestamp, definition, value

processor_task() async

Task to process queued lines and filter readings.

Processes lines from the queue, parses them, and stores relevant readings.

Source code in opensampl/collect/microchip/twst/readings.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def processor_task(self):
    """
    Task to process queued lines and filter readings.

    Processes lines from the queue, parses them, and stores relevant readings.
    """
    while True:
        line = await self.queue.get()
        try:
            if line == SENTINEL:
                break
            parsed = self.parse_line(line)
            if parsed:
                timestamp, definition, value = parsed
                if self.should_keep(definition):
                    self.readings.append(parsed)
        finally:
            self.queue.task_done()

reader_task() async

Task to continuously read lines from the modem.

Reads lines from the telnet connection and queues them for processing.

Source code in opensampl/collect/microchip/twst/readings.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@require_conn
async def reader_task(self):
    """
    Task to continuously read lines from the modem.

    Reads lines from the telnet connection and queues them for processing.
    """
    try:
        while self.continue_reading:
            line = await asyncio.wait_for(self.reader.readline(), timeout=5.0)
            if not line:
                break  # EOF
            await self.queue.put(line)
    except asyncio.TimeoutError:
        logger.debug(f"Timeout waiting for data from {self.host}:{self.port}")
    finally:
        await self.queue.put(SENTINEL)

should_keep(definition)

Determine if a reading should be kept based on its definition.

Parameters:

Name Type Description Default
definition str

Reading definition string.

required

Returns:

Type Description
bool

True if the reading matches any of the configured key suffixes

Source code in opensampl/collect/microchip/twst/readings.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def should_keep(self, definition: str) -> bool:
    """
    Determine if a reading should be kept based on its definition.

    Args:
        definition: Reading definition string.

    Returns:
        True if the reading matches any of the configured key suffixes

    """
    if self.keys is None:
        return True  # Collect all readings when no keys specified
    return any(definition.endswith(suffix) for suffix in self.keys)