diff --git a/lang/py/avro/datafile.py b/lang/py/avro/datafile.py index d39a91131e3..0f002cee1b5 100644 --- a/lang/py/avro/datafile.py +++ b/lang/py/avro/datafile.py @@ -26,7 +26,7 @@ import json import warnings from types import TracebackType -from typing import IO, AnyStr, BinaryIO, MutableMapping, Optional, Type, cast +from typing import IO, AnyStr, MutableMapping, Optional, Type, cast import avro.codecs import avro.errors diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py index 7b5576697eb..a57a353447f 100644 --- a/lang/py/avro/io.py +++ b/lang/py/avro/io.py @@ -89,17 +89,7 @@ import decimal import struct import warnings -from typing import ( - IO, - Deque, - Generator, - Iterable, - List, - Mapping, - Optional, - Sequence, - Union, -) +from typing import IO, Generator, Iterable, List, Mapping, Optional, Sequence, Union import avro.constants import avro.errors @@ -435,7 +425,6 @@ def write_null(self, datum: None) -> None: """ null is written as zero bytes """ - pass def write_boolean(self, datum: bool) -> None: """ @@ -810,7 +799,7 @@ def read_array(self, writers_schema: avro.schema.ArraySchema, readers_schema: av while block_count != 0: if block_count < 0: block_count = -block_count - block_size = decoder.read_long() + decoder.read_long() for i in range(block_count): read_items.append(self.read_data(writers_schema.items, readers_schema.items, decoder)) block_count = decoder.read_long() @@ -847,7 +836,7 @@ def read_map(self, writers_schema: avro.schema.MapSchema, readers_schema: avro.s while block_count != 0: if block_count < 0: block_count = -block_count - block_size = decoder.read_long() + decoder.read_long() for i in range(block_count): key = decoder.read_utf8() read_items[key] = self.read_data(writers_schema.values, readers_schema.values, decoder) diff --git a/lang/py/avro/ipc.py b/lang/py/avro/ipc.py index 7ab6ea497ae..7a5a5831f08 100644 --- a/lang/py/avro/ipc.py +++ b/lang/py/avro/ipc.py @@ -199,7 +199,7 @@ def read_call_response(self, message_name, decoder): the error, serialized per the message's error union schema. """ # response metadata - response_metadata = META_READER.read(decoder) + META_READER.read(decoder) # remote response schema remote_message_schema = self.remote_protocol.messages.get(message_name) @@ -288,7 +288,7 @@ def respond(self, call_request): return buffer_writer.getvalue() # read request using remote protocol - request_metadata = META_READER.read(buffer_decoder) + META_READER.read(buffer_decoder) remote_message_name = buffer_decoder.read_utf8() # get remote and local request schemas so we can do @@ -364,9 +364,8 @@ def process_handshake(self, decoder, encoder): def invoke(self, local_message, request): """ - Aactual work done by server: cf. handler in thrift. + Actual work done by server: cf. handler in thrift. """ - pass def read_request(self, writers_schema, readers_schema, decoder): datum_reader = avro.io.DatumReader(writers_schema, readers_schema) diff --git a/lang/py/avro/test/test_schema.py b/lang/py/avro/test/test_schema.py index 668ca8258f2..056244cdacc 100644 --- a/lang/py/avro/test/test_schema.py +++ b/lang/py/avro/test/test_schema.py @@ -58,7 +58,7 @@ class InvalidTestSchema(TestSchema): valid = False -PRIMITIVE_EXAMPLES = [InvalidTestSchema('"True"')] # type: List[TestSchema] +PRIMITIVE_EXAMPLES: List[TestSchema] = [InvalidTestSchema('"True"')] PRIMITIVE_EXAMPLES.append(InvalidTestSchema("True")) PRIMITIVE_EXAMPLES.append(InvalidTestSchema('{"no_type": "test"}')) PRIMITIVE_EXAMPLES.append(InvalidTestSchema('{"type": "panther"}')) diff --git a/lang/py/avro/test/test_tether_task.py b/lang/py/avro/test/test_tether_task.py index 5a4e2b26dbe..35be22437dd 100644 --- a/lang/py/avro/test/test_tether_task.py +++ b/lang/py/avro/test/test_tether_task.py @@ -18,7 +18,6 @@ # limitations under the License. import io -import os import subprocess import sys import time diff --git a/lang/py/avro/test/test_tether_task_runner.py b/lang/py/avro/test/test_tether_task_runner.py index 7696161d3c9..a8726526851 100644 --- a/lang/py/avro/test/test_tether_task_runner.py +++ b/lang/py/avro/test/test_tether_task_runner.py @@ -19,7 +19,6 @@ import io import logging -import os import subprocess import sys import time @@ -47,7 +46,7 @@ def test1(self): pyfile = avro.test.mock_tether_parent.__file__ proc = subprocess.Popen([sys.executable, pyfile, "start_server", f"{parent_port}"]) - input_port = avro.tether.util.find_port() + avro.tether.util.find_port() print(f"Mock server started process pid={proc.pid}") # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started diff --git a/lang/py/avro/tether/__init__.py b/lang/py/avro/tether/__init__.py index 4875581f292..68df39ec4d4 100644 --- a/lang/py/avro/tether/__init__.py +++ b/lang/py/avro/tether/__init__.py @@ -27,3 +27,13 @@ ) from avro.tether.tether_task_runner import TaskRunner from avro.tether.util import find_port + +__all__ = ( + "HTTPRequestor", + "TaskRunner", + "TaskType", + "TetherTask", + "find_port", + "inputProtocol", + "outputProtocol", +) diff --git a/lang/py/avro/tether/tether_task.py b/lang/py/avro/tether/tether_task.py index c521fa56b4c..6caac6abe90 100644 --- a/lang/py/avro/tether/tether_task.py +++ b/lang/py/avro/tether/tether_task.py @@ -285,7 +285,7 @@ def configure(self, taskType, inSchemaText, outSchemaText): try: inSchema = avro.schema.parse(inSchemaText) - outSchema = avro.schema.parse(outSchemaText) + avro.schema.parse(outSchemaText) if taskType == TaskType.MAP: self.inReader = avro.io.DatumReader(writers_schema=inSchema, readers_schema=self.inschema) @@ -299,7 +299,7 @@ def configure(self, taskType, inSchemaText, outSchemaText): # determine which fields in the input record are they keys for the reducer self._red_fkeys = [f.name for f in self.midschema.fields if not (f.order == "ignore")] - except Exception as e: + except Exception: estr = traceback.format_exc() self.fail(estr) @@ -345,7 +345,7 @@ def input(self, data, count): self.reduceFlush(prev, self.outCollector) self.reduce(self.midRecord, self.outCollector) - except Exception as e: + except Exception: estr = traceback.format_exc() self.log.warning("failing: %s", estr) self.fail(estr) @@ -357,7 +357,7 @@ def complete(self): if (self.taskType == TaskType.REDUCE) and not (self.midRecord is None): try: self.reduceFlush(self.midRecord, self.outCollector) - except Exception as e: + except Exception: estr = traceback.format_exc() self.log.warning("failing: %s", estr) self.fail(estr) @@ -430,7 +430,7 @@ def fail(self, message): try: self.outputClient.request("fail", {"message": message}) - except Exception as e: + except Exception: self.log.exception("TetherTask.fail: an exception occured while trying to send the fail message to the output server.") self.close() @@ -441,7 +441,7 @@ def close(self): try: self.clienTransciever.close() - except Exception as e: + except Exception: # ignore exceptions pass diff --git a/lang/py/avro/tether/tether_task_runner.py b/lang/py/avro/tether/tether_task_runner.py index c1533b33353..410f6c00e4c 100644 --- a/lang/py/avro/tether/tether_task_runner.py +++ b/lang/py/avro/tether/tether_task_runner.py @@ -66,7 +66,7 @@ def invoke(self, message, request): self.log.info("TetherTaskRunner: Received partitions") try: self.task.partitions = request["partitions"] - except Exception as e: + except Exception: self.log.error("Exception occured while processing the partitions message: Message:\n%s", traceback.format_exc()) raise elif message.name == "input": diff --git a/lang/py/avro/utils.py b/lang/py/avro/utils.py index 76d8f6ec293..32e7f5aa36a 100644 --- a/lang/py/avro/utils.py +++ b/lang/py/avro/utils.py @@ -36,3 +36,5 @@ def _randbytes(n: int) -> bytes: randbytes = getattr(random, "randbytes", _randbytes) + +__all__ = ("randbytes", "TypedDict") diff --git a/lang/py/setup.py b/lang/py/setup.py index dee0e7f2ee9..d68073a7b6d 100755 --- a/lang/py/setup.py +++ b/lang/py/setup.py @@ -19,9 +19,7 @@ import distutils.errors -import glob import os -import subprocess import setuptools # type: ignore