diff --git a/robot/board.py b/robot/board.py index 2bfa218..d2da98f 100644 --- a/robot/board.py +++ b/robot/board.py @@ -3,7 +3,22 @@ import socket import time from pathlib import Path -from typing import Iterable, Mapping, TypeVar, Union +from typing import ( # noqa: F401 + Any, + Callable, + Iterable, + Iterator, + Mapping, + Optional, + Sequence, + Tuple, + TypeVar, + Union, + overload, +) + +T = TypeVar('T') +_Message = Mapping[str, Any] LOGGER = logging.getLogger(__name__) @@ -16,17 +31,17 @@ class Board: def __init__(self, socket_path: Union[Path, str]) -> None: self.socket_path = Path(socket_path) - self.socket = None + self.socket = None # type: Optional[socket.socket] self.data = b'' self._connect() @property - def serial(self): + def serial(self) -> str: """Serial number for the board.""" return self.socket_path.stem - def _greeting_response(self, data): + def _greeting_response(self, data: _Message) -> None: """ Handle the response to the greeting command. @@ -34,7 +49,7 @@ def _greeting_response(self, data): """ pass - def _connect(self): + def _connect(self) -> None: """ Connect or reconnect to a socket. @@ -63,7 +78,7 @@ def _get_lc_error(self) -> str: path=self.socket_path, ) - def _socket_with_single_retry(self, handler): + def _socket_with_single_retry(self, handler: Callable[[], T]) -> T: retryable_errors = ( socket.timeout, BrokenPipeError, @@ -99,7 +114,7 @@ def _socket_with_single_retry(self, handler): raise original_exception - def _send(self, message, should_retry=True): + def _send(self, message: _Message, should_retry: bool=True) -> None: """ Send a message to robotd. @@ -109,7 +124,7 @@ def _send(self, message, should_retry=True): data = (json.dumps(message) + '\n').encode('utf-8') - def sendall(): + def sendall() -> None: self.socket.sendall(data) if should_retry: @@ -117,13 +132,13 @@ def sendall(): else: return sendall() - def _recv_from_socket(self, size): + def _recv_from_socket(self, size: int) -> bytes: data = self.socket.recv(size) if data == b'': raise BrokenPipeError() return data - def _receive(self, should_retry=True): + def _receive(self, should_retry: bool=True) -> _Message: """ Receive a message from robotd. """ @@ -142,21 +157,25 @@ def _receive(self, should_retry=True): return json.loads(line.decode('utf-8')) - def _send_and_receive(self, message, should_retry=True): + def _send_and_receive( + self, + message: _Message, + should_retry: bool=True, + ) -> _Message: """ Send a message to robotd and wait for a response. """ self._send(message, should_retry) return self._receive(should_retry) - def close(self): + def close(self) -> None: """ Close the the connection to the underlying robotd board. """ self.socket.detach() - def __str__(self): - return "{} - {}".format(self.__name__, self.serial) + def __str__(self) -> str: + return "{} - {}".format(self.__class__.__name__, self.serial) __del__ = close diff --git a/robot/camera.py b/robot/camera.py index bcf3cc5..be5c18d 100644 --- a/robot/camera.py +++ b/robot/camera.py @@ -1,6 +1,6 @@ import socket import time -from typing import List, overload +from typing import Any, List, Mapping, Sequence, overload from robot.board import Board from robot.markers import Marker @@ -41,7 +41,7 @@ class Camera(Board): """ @staticmethod - def _see_to_results(data) -> ResultList: + def _see_to_results(data: Mapping[str, Sequence[Mapping[str, Any]]]) -> ResultList: """ Convert the data from ``robotd`` into a sorted of ``Marker``s. diff --git a/robot/markers.py b/robot/markers.py index 40f5938..a7a3086 100644 --- a/robot/markers.py +++ b/robot/markers.py @@ -1,6 +1,6 @@ import math import warnings -from typing import List, NamedTuple, NewType, Tuple +from typing import Any, List, Mapping, NamedTuple, NewType, Tuple from robot.game_specific import TOKEN, WALL @@ -102,7 +102,7 @@ def distance_metres(self) -> Metres: class Marker: """A marker captured from a webcam image.""" - def __init__(self, data): + def __init__(self, data: Mapping[str, Any]) -> None: self._raw_data = data @property diff --git a/robot/motor.py b/robot/motor.py index 0ef43cd..de8a184 100644 --- a/robot/motor.py +++ b/robot/motor.py @@ -1,5 +1,11 @@ +from typing import Union + from robot.board import Board +Power = Union[float, str] +_VoltageString = Union[float, str] + + # BRAKE is set to 0 so setting the motors to 0 has exactly the same affect as # setting the motors to BRAKE BRAKE = 0 @@ -10,73 +16,79 @@ class MotorBoard(Board): """A motor board with two motor outputs.""" @staticmethod - def _string_to_power(voltage): + def _string_to_power(voltage: _VoltageString) -> Power: """ Converts a string to a Voltage value. :param voltage: :return: """ - if voltage == 'coast': - return COAST - elif voltage == 'brake': - return BRAKE - elif -1 <= voltage <= 1: - return voltage - else: - raise ValueError( - "Incorrect voltage value, valid values: between -1 and 1, " - "'coast', or 'brake'", - ) + if isinstance(voltage, str): + if voltage == 'coast': + return COAST + elif voltage == 'brake': + return BRAKE + + if isinstance(voltage, float): + if -1 <= voltage <= 1: + return voltage + + raise ValueError( + "Incorrect voltage value, valid values: between -1 and 1, " + "'coast', or 'brake'", + ) @staticmethod - def _power_to_string(voltage): + def _power_to_string(voltage: Power) -> _VoltageString: """ Converts a voltage value to the equivalent API value. This is the reverse of inverse of ``MotorBoard.string_to_voltage``. """ - if voltage is COAST: - return 'coast' - # Explicit or implicit stopping - elif voltage is BRAKE or voltage == 0: - return 'brake' - elif -1 <= voltage <= 1: - return voltage - else: - raise ValueError( - "Incorrect voltage value, valid values: between -1 and 1, " - "robot.COAST, or robot.BRAKE", - ) + if isinstance(voltage, str): + if voltage is COAST: + return 'coast' + + if isinstance(voltage, float): + # Explicit or implicit stopping + if voltage is BRAKE or voltage == 0: + return 'brake' + elif -1 <= voltage <= 1: + return voltage + + raise ValueError( + "Incorrect voltage value, valid values: between -1 and 1, " + "robot.COAST, or robot.BRAKE", + ) @property - def m0(self) -> float: + def m0(self) -> Power: """ :return: The value of motor output 0. """ return self._get_status("m0") @m0.setter - def m0(self, power: float): + def m0(self, power: Power) -> None: self._update_motor("m0", power) @property - def m1(self) -> float: + def m1(self) -> Power: """ :return: The value of motor output 1. """ return self._get_status("m1") @m1.setter - def m1(self, power: float): + def m1(self, power: Power) -> None: self._update_motor("m1", power) - def _get_status(self, motor_id: str): + def _get_status(self, motor_id: str) -> Power: return self._string_to_power( self._send_and_receive({})[motor_id], ) - def _update_motor(self, motor_id: str, voltage: float): + def _update_motor(self, motor_id: str, voltage: Power) -> None: """ Set the value of a motor output. diff --git a/robot/power.py b/robot/power.py index 0876be2..fc8c0c5 100644 --- a/robot/power.py +++ b/robot/power.py @@ -20,20 +20,20 @@ class PowerBoard(Board): 'uc': 523, } - def power_on(self): + def power_on(self) -> None: """ Turn on power to all power board outputs. """ self._send_and_receive({'power': True}) - def power_off(self): + def power_off(self) -> None: """ Turn off power to all power board outputs. """ self._send_and_receive({'power': False}) - def set_start_led(self, value: bool): + def set_start_led(self, value: bool) -> None: """Set the state of the start LED.""" self._send_and_receive({'start-led': value}) @@ -45,7 +45,7 @@ def start_button_pressed(self) -> bool: status = self._send_and_receive({}) return status["start-button"] - def wait_start(self): + def wait_start(self) -> None: """ Block until the start button is pressed. """ @@ -60,7 +60,7 @@ def wait_start(self): self.set_start_led(False) LOGGER.info("Starting user code.") - def buzz(self, duration, *, note=None, frequency=None): + def buzz(self, duration: float, *, note: str=None, frequency: int=None) -> None: """Enqueue a note to be played by the buzzer on the power board.""" if note is None and frequency is None: raise ValueError("Either note or frequency must be provided") diff --git a/robot/robot.py b/robot/robot.py index 3238bba..d743c4e 100644 --- a/robot/robot.py +++ b/robot/robot.py @@ -1,6 +1,6 @@ import logging from pathlib import Path -from typing import List, Set, Type, Union # noqa: F401 +from typing import Any, List, Set, Type, Union # noqa: F401 from robot.board import BoardList, TBoard from robot.camera import Camera @@ -59,7 +59,7 @@ def __init__( if wait_for_start_button: self.power_board.wait_start() - def _assert_has_power_board(self): + def _assert_has_power_board(self) -> None: power_boards = self.power_boards if not power_boards: raise RuntimeError('Cannot find Power Board!') @@ -139,7 +139,7 @@ def _games(self) -> BoardList[GameState]: return self._update_boards(self.known_gamestates, GameState, 'game') @staticmethod - def _single_index(name, list_of_boards: BoardList[TBoard]) -> TBoard: + def _single_index(name: str, list_of_boards: BoardList[TBoard]) -> TBoard: if list_of_boards: return list_of_boards[0] else: @@ -211,7 +211,7 @@ def mode(self) -> GameMode: """ return self._game.mode - def close(self): + def close(self) -> None: """ Cleanup robot instance. """ @@ -230,5 +230,5 @@ def close(self): # reanimate the boards (which isn't supported). del board_group[:] - def __del__(self): + def __del__(self) -> None: self.close() diff --git a/robot/servo.py b/robot/servo.py index e915baf..9407d4f 100644 --- a/robot/servo.py +++ b/robot/servo.py @@ -1,5 +1,6 @@ from enum import Enum -from typing import Dict, List +from pathlib import Path +from typing import Dict, List, Union from robot.board import Board @@ -23,7 +24,7 @@ class PinValue(Enum): class Servo: """A servo output on a ``ServoBoard``.""" - def __init__(self, servo_id, set_pos, get_pos): + def __init__(self, servo_id: int, set_pos, get_pos) -> None: self.servo_id = servo_id self._set_pos = set_pos self._get_pos = get_pos @@ -34,7 +35,7 @@ def position(self) -> float: return self._get_pos() @position.setter - def position(self, position): + def position(self, position: float) -> None: if position > 1 or position < -1: raise ValueError("servo position must be between -1 and 1") self._set_pos(position) @@ -43,7 +44,7 @@ def position(self, position): class Gpio: """A general-purpose input-output pin on a ``ServoBoard``.""" - def __init__(self, pin_id, pin_read, pin_mode_get, pin_mode_set): + def __init__(self, pin_id: int, pin_read, pin_mode_get, pin_mode_set) -> None: self._pin_id = pin_id self._pin_read = pin_read self._pin_mode_get = pin_mode_get @@ -55,7 +56,7 @@ def mode(self) -> PinMode: return PinMode(self._pin_mode_get()) @mode.setter - def mode(self, mode: PinMode): + def mode(self, mode: PinMode) -> None: """ Set the mode the pin should be in. @@ -108,7 +109,7 @@ class ServoBoard(Board): This is an arduino with a servo shield attached. """ - def __init__(self, socket_path): + def __init__(self, socket_path: Union[Path, str]) -> None: super().__init__(socket_path) servo_ids = range(0, 16) # servos with a port 0-15 @@ -131,7 +132,7 @@ def __init__(self, socket_path): for x in gpio_pins } # type: Dict[int, Gpio] - def direct_command(self, command_name: str, *args) -> List[str]: + def direct_command(self, command_name: str, *args: str) -> List[str]: """ Issue a command directly to the arduino. @@ -168,7 +169,7 @@ def servos(self) -> Dict[int, Servo]: """List of ``Servo`` outputs for the servo board.""" return self._servos - def _set_servo_pos(self, servo: int, pos: float): + def _set_servo_pos(self, servo: int, pos: float) -> None: self._send_and_receive({'servos': {servo: pos}}) def _get_servo_pos(self, servo: int) -> float: @@ -182,7 +183,7 @@ def gpios(self) -> Dict[int, Gpio]: """List of ``Gpio`` pins for the servo board.""" return self._gpios - def _read_pin(self, pin) -> PinValue: + def _read_pin(self, pin: int) -> PinValue: # request a check for that pin by trying to set it to None data = self._send_and_receive({'read-pins': [pin]}) # example data value: @@ -190,14 +191,14 @@ def _read_pin(self, pin) -> PinValue: values = data['pin-values'] return PinValue(values[str(pin)]) - def _get_pin_mode(self, pin) -> PinMode: + def _get_pin_mode(self, pin: int) -> PinMode: data = self._send_and_receive({}) # example data value: # {'pins':{2:'pullup'}} values = data['pins'] return PinMode(values[str(pin)]) - def _set_pin_mode(self, pin, value: PinMode): + def _set_pin_mode(self, pin: int, value: PinMode) -> None: self._send_and_receive({'pins': {pin: value.value}}) def read_analogue(self) -> Dict[str, str]: @@ -205,7 +206,7 @@ def read_analogue(self) -> Dict[str, str]: command = {'read-analogue': True} return self._send_and_receive(command)['analogue-values'] - def read_ultrasound(self, trigger_pin, echo_pin): + def read_ultrasound(self, trigger_pin: int, echo_pin: int) -> float: """ Read an ultrasound value from an ultrasound sensor. diff --git a/setup.cfg b/setup.cfg index 3c11c78..3142331 100644 --- a/setup.cfg +++ b/setup.cfg @@ -47,4 +47,4 @@ warn_unused_ignores = True strict_optional = True disallow_any_generics = True -check_untyped_defs = True +disallow_untyped_defs = True