Skip to content

Commit

Permalink
Improve method names and input validation
Browse files Browse the repository at this point in the history
  • Loading branch information
clssn committed Mar 10, 2024
1 parent 81dedfc commit 6a9d560
Showing 1 changed file with 24 additions and 17 deletions.
41 changes: 24 additions & 17 deletions src/numato_gpio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ def id(self):
@id.setter
def id(self, new_id):
"""Re-program the device id to the value in new_id."""
self._query(f"id set {new_id:08x}")
self._read_response("")
self._query_string(f"id set {new_id:08x}")
self._id = new_id

@property
Expand Down Expand Up @@ -291,7 +290,6 @@ def notify(self, enable):
expected_response = f"gpio notify {'enabled' if enable else 'disabled'}"

with self._rw_lock:

self._query(query)
self._read_response(expected_response)

Expand Down Expand Up @@ -391,7 +389,7 @@ def _query(self, query):
with self._rw_lock:
self._write(f"{query}\r".encode())
try:
self._read_string(query)
self._read_expected_string(query)
except NumatoGpioError as err:
raise NumatoGpioError(
f"Query {repr(query)} returned unexpected echo {repr(str(err))}"
Expand All @@ -405,15 +403,22 @@ def _write(self, query):
self._ser.close()
raise NumatoGpioError("Serial communication failure") from err

def _read_string(self, expected):
string = self._read(len(expected.encode()))
def _read_expected_string(self, expected: str) -> None:
"""Consume an exact string from the input buffer.
Reads only the amount of characters and ensures that the string matches
expecations. Otherwise raises an error.
Doesn't return the string as the user's already got it.
"""
string = self._read_from_buf(len(expected.encode()))
# Some devices respond with mixed uppercasing,
# lowering the response should match expected
if string.lower() != expected:
raise NumatoGpioError(string)

def _query_string(self, query: str) -> str:
"""Send a query and returns the answer as a string.
"""Send a query and returns the response up to the prompt as a string.
The answer excludes the end-of-line and prompt characters.
"""
Expand All @@ -424,35 +429,37 @@ def _query_string(self, query: str) -> str:

def _read_int(self, query, bits):
with self._rw_lock:
self._query(query)
response = self._read(bits // 4)
response = self._query_string(query)
try:
if len(response) != bits // 4:
raise NumatoGpioError(
f"Unexpected string {repr(str(response))} "
f"after successful query {repr(query)}"
)
val = int(response, 16)
self._read_string(">")
except ValueError as err:
raise NumatoGpioError(
f"Query '{repr(query)}' returned unexpected result "
f"{repr(response)}. Expected a {bits} bit integer in "
"hexadecimal notation."
) from err
except NumatoGpioError as err:
raise NumatoGpioError(
f"Unexpected string {repr(str(err))} after successful query "
f"{repr(query)}"
) from err
return val

def _read_response(self, expected: Optional[str] = None):
"""Read a response up to the terminating prompt.
Consume the prompt (>) character from the input buffer but do not return it.
"""
response = ""
while (read_byte := self._read(1)) != ">":
while (read_byte := self._read_from_buf(1)) != ">":
response += read_byte
if expected and expected.lower() != response.lower():
raise NumatoGpioError(
f"Expected response {repr(expected)}, got {repr(response)}"
)
return response

def _read(self, num):
def _read_from_buf(self, num):
self._can_read.acquire()
while len(self._buf) < num:
self._can_read.wait()
Expand Down

0 comments on commit 6a9d560

Please sign in to comment.