Skip to content

opensampl.collect.microchip.twst.context

Context reader for Microchip TWST ATS6502 modems.

This module provides functionality to read context information from ATS6502 modems, including local and remote station information.

ModemContextReader

Bases: ModemReader

Reader for ATS6502 modem context information.

Provides methods to connect to an ATS6502 modem and retrieve context information including local station details and remote station tracking data.

Source code in opensampl/collect/microchip/twst/context.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class ModemContextReader(ModemReader):
    """
    Reader for ATS6502 modem context information.

    Provides methods to connect to an ATS6502 modem and retrieve context information
    including local station details and remote station tracking data.
    """

    def __init__(self, host: str, prompt: str = "ATS 6502>", port: int = 1700):
        """
        Initialize ModemContextReader.

        Args:
            host: IP address or hostname of the ATS6502 modem.
            prompt: Command prompt string expected from the modem.
            port: what port to connect to for commands (default 1700).

        """
        self.result = SimpleNamespace()
        self.prompt = prompt
        super().__init__(host=host, port=port)

    @staticmethod
    def finished_ok(line: str) -> bool:
        """
        Check if a command completed successfully.

        Args:
            line: Response line from the modem.

        Returns:
            True if the line indicates successful completion.

        """
        return re.match(r"^\[OK\]", line) is not None

    @staticmethod
    def finished_error(line: str) -> tuple[bool, Optional[str]]:
        """
        Check if a command completed with an error.

        Args:
            line: Response line from the modem.

        Returns:
            Tuple of (is_error, error_message).

        """
        error = re.match(r"^\[ERROR\]", line) is not None
        if error:
            return error, line
        return error, None

    def bracket_to_dict(self, raw_text: str):
        """
        Convert bracketed text format to dictionary.

        Args:
            raw_text: Raw text with [SECTION] headers.

        Returns:
            Dictionary representation of the structured data.

        """
        yaml_unbrack = re.sub(
            r"^\s*\[(\w+)\]",
            lambda m: m.group(0).replace(f"[{m.group(1)}]", f"{m.group(1)}:"),
            raw_text,
            flags=re.MULTILINE,
        )
        yaml_ready = textwrap.dedent(yaml_unbrack)
        return yaml.safe_load(yaml_ready)

    @require_conn
    async def send_cmd(self, cmd: str):
        """
        Send a command to the modem and return parsed response.

        Args:
            cmd: Command string to send.

        Returns:
            Dictionary containing the parsed response.

        """
        logger.debug(f"Sending command {cmd=}")

        self.writer.write(cmd + "\n")
        await self.writer.drain()
        response = await self.read_until_exit()

        return self.bracket_to_dict(response)

    @require_conn
    async def read_until_exit(self):
        """
        Read response lines until completion or error.

        Returns:
            Accumulated response text.

        Raises:
            RuntimeError: If an error response is received.

        """
        buffer = ""
        while True:
            chunk = await self.reader.readline()  # read one line at a time
            logger.trace(chunk)  # live print

            if self.finished_ok(chunk):
                break
            err, msg = self.finished_error(chunk)
            if err:
                raise RuntimeError(msg)
            if self.prompt not in chunk:
                buffer += chunk

        return buffer

    async def get_context(self):
        """
        Retrieve context information from the modem.

        Connects to the modem and retrieves local station information
        and remote station tracking data.
        """
        async with self.connect():
            self.result.timestamp = datetime.now(tz=timezone.utc).isoformat() + "Z"
            self.result.local = SimpleNamespace()
            self.result.remotes = {}

            show_result = await self.send_cmd("show")

            self.result.local.sid = show_result.get("settings").get("modem").get("sid")
            self.result.local.prn = show_result.get("status").get("modem").get("tx").get("prn")
            self.result.local.ip = show_result.get("network").get("static").get("ip")
            self.result.local.lat = (
                show_result.get("status").get("modem").get("position").get("station").get("latitude")
            )
            self.result.local.lon = (
                show_result.get("status").get("modem").get("position").get("station").get("longitude")
            )

            rx_status = show_result.get("status").get("modem").get("rx").get("chan")
            for chan_num, block in rx_status.items():
                sid = block.get("remote").get("sid")
                prn = block.get("tracking").get("prn")
                lat = block.get("remote").get("position").get("station").get("latitude")
                lon = block.get("remote").get("position").get("station").get("longitude")

                if not sid:
                    continue

                self.result.remotes[chan_num] = {"rx_channel": chan_num, "sid": sid, "prn": prn, "lat": lat, "lon": lon}

    def get_result_as_yaml_comment(self):
        """
        Get results formatted as YAML comments.

        Returns:
            String containing results formatted as commented YAML.

        """
        yaml_text = yaml.dump(self.result_dict(), sort_keys=False)
        return textwrap.indent(yaml_text, prefix="# ")

    def result_dict(self):
        """
        Convert result SimpleNamespace to dictionary.

        Returns:
            Dictionary representation of the results.

        """

        def namespace_to_dict(ns: Any) -> Any:
            if isinstance(ns, SimpleNamespace):
                return {key: namespace_to_dict(value) for key, value in vars(ns).items()}
            if isinstance(ns, list):
                return [namespace_to_dict(item) for item in ns]
            if isinstance(ns, dict):
                return {key: namespace_to_dict(value) for key, value in ns.items()}
            return ns

        return namespace_to_dict(self.result)

__init__(host, prompt='ATS 6502>', port=1700)

Initialize ModemContextReader.

Parameters:

Name Type Description Default
host str

IP address or hostname of the ATS6502 modem.

required
prompt str

Command prompt string expected from the modem.

'ATS 6502>'
port int

what port to connect to for commands (default 1700).

1700
Source code in opensampl/collect/microchip/twst/context.py
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, host: str, prompt: str = "ATS 6502>", port: int = 1700):
    """
    Initialize ModemContextReader.

    Args:
        host: IP address or hostname of the ATS6502 modem.
        prompt: Command prompt string expected from the modem.
        port: what port to connect to for commands (default 1700).

    """
    self.result = SimpleNamespace()
    self.prompt = prompt
    super().__init__(host=host, port=port)

bracket_to_dict(raw_text)

Convert bracketed text format to dictionary.

Parameters:

Name Type Description Default
raw_text str

Raw text with [SECTION] headers.

required

Returns:

Type Description

Dictionary representation of the structured data.

Source code in opensampl/collect/microchip/twst/context.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def bracket_to_dict(self, raw_text: str):
    """
    Convert bracketed text format to dictionary.

    Args:
        raw_text: Raw text with [SECTION] headers.

    Returns:
        Dictionary representation of the structured data.

    """
    yaml_unbrack = re.sub(
        r"^\s*\[(\w+)\]",
        lambda m: m.group(0).replace(f"[{m.group(1)}]", f"{m.group(1)}:"),
        raw_text,
        flags=re.MULTILINE,
    )
    yaml_ready = textwrap.dedent(yaml_unbrack)
    return yaml.safe_load(yaml_ready)

finished_error(line) staticmethod

Check if a command completed with an error.

Parameters:

Name Type Description Default
line str

Response line from the modem.

required

Returns:

Type Description
tuple[bool, Optional[str]]

Tuple of (is_error, error_message).

Source code in opensampl/collect/microchip/twst/context.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@staticmethod
def finished_error(line: str) -> tuple[bool, Optional[str]]:
    """
    Check if a command completed with an error.

    Args:
        line: Response line from the modem.

    Returns:
        Tuple of (is_error, error_message).

    """
    error = re.match(r"^\[ERROR\]", line) is not None
    if error:
        return error, line
    return error, None

finished_ok(line) staticmethod

Check if a command completed successfully.

Parameters:

Name Type Description Default
line str

Response line from the modem.

required

Returns:

Type Description
bool

True if the line indicates successful completion.

Source code in opensampl/collect/microchip/twst/context.py
42
43
44
45
46
47
48
49
50
51
52
53
54
@staticmethod
def finished_ok(line: str) -> bool:
    """
    Check if a command completed successfully.

    Args:
        line: Response line from the modem.

    Returns:
        True if the line indicates successful completion.

    """
    return re.match(r"^\[OK\]", line) is not None

get_context() async

Retrieve context information from the modem.

Connects to the modem and retrieves local station information and remote station tracking data.

Source code in opensampl/collect/microchip/twst/context.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def get_context(self):
    """
    Retrieve context information from the modem.

    Connects to the modem and retrieves local station information
    and remote station tracking data.
    """
    async with self.connect():
        self.result.timestamp = datetime.now(tz=timezone.utc).isoformat() + "Z"
        self.result.local = SimpleNamespace()
        self.result.remotes = {}

        show_result = await self.send_cmd("show")

        self.result.local.sid = show_result.get("settings").get("modem").get("sid")
        self.result.local.prn = show_result.get("status").get("modem").get("tx").get("prn")
        self.result.local.ip = show_result.get("network").get("static").get("ip")
        self.result.local.lat = (
            show_result.get("status").get("modem").get("position").get("station").get("latitude")
        )
        self.result.local.lon = (
            show_result.get("status").get("modem").get("position").get("station").get("longitude")
        )

        rx_status = show_result.get("status").get("modem").get("rx").get("chan")
        for chan_num, block in rx_status.items():
            sid = block.get("remote").get("sid")
            prn = block.get("tracking").get("prn")
            lat = block.get("remote").get("position").get("station").get("latitude")
            lon = block.get("remote").get("position").get("station").get("longitude")

            if not sid:
                continue

            self.result.remotes[chan_num] = {"rx_channel": chan_num, "sid": sid, "prn": prn, "lat": lat, "lon": lon}

get_result_as_yaml_comment()

Get results formatted as YAML comments.

Returns:

Type Description

String containing results formatted as commented YAML.

Source code in opensampl/collect/microchip/twst/context.py
176
177
178
179
180
181
182
183
184
185
def get_result_as_yaml_comment(self):
    """
    Get results formatted as YAML comments.

    Returns:
        String containing results formatted as commented YAML.

    """
    yaml_text = yaml.dump(self.result_dict(), sort_keys=False)
    return textwrap.indent(yaml_text, prefix="# ")

read_until_exit() async

Read response lines until completion or error.

Returns:

Type Description

Accumulated response text.

Raises:

Type Description
RuntimeError

If an error response is received.

Source code in opensampl/collect/microchip/twst/context.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@require_conn
async def read_until_exit(self):
    """
    Read response lines until completion or error.

    Returns:
        Accumulated response text.

    Raises:
        RuntimeError: If an error response is received.

    """
    buffer = ""
    while True:
        chunk = await self.reader.readline()  # read one line at a time
        logger.trace(chunk)  # live print

        if self.finished_ok(chunk):
            break
        err, msg = self.finished_error(chunk)
        if err:
            raise RuntimeError(msg)
        if self.prompt not in chunk:
            buffer += chunk

    return buffer

result_dict()

Convert result SimpleNamespace to dictionary.

Returns:

Type Description

Dictionary representation of the results.

Source code in opensampl/collect/microchip/twst/context.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def result_dict(self):
    """
    Convert result SimpleNamespace to dictionary.

    Returns:
        Dictionary representation of the results.

    """

    def namespace_to_dict(ns: Any) -> Any:
        if isinstance(ns, SimpleNamespace):
            return {key: namespace_to_dict(value) for key, value in vars(ns).items()}
        if isinstance(ns, list):
            return [namespace_to_dict(item) for item in ns]
        if isinstance(ns, dict):
            return {key: namespace_to_dict(value) for key, value in ns.items()}
        return ns

    return namespace_to_dict(self.result)

send_cmd(cmd) async

Send a command to the modem and return parsed response.

Parameters:

Name Type Description Default
cmd str

Command string to send.

required

Returns:

Type Description

Dictionary containing the parsed response.

Source code in opensampl/collect/microchip/twst/context.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@require_conn
async def send_cmd(self, cmd: str):
    """
    Send a command to the modem and return parsed response.

    Args:
        cmd: Command string to send.

    Returns:
        Dictionary containing the parsed response.

    """
    logger.debug(f"Sending command {cmd=}")

    self.writer.write(cmd + "\n")
    await self.writer.drain()
    response = await self.read_until_exit()

    return self.bracket_to_dict(response)