Skip to content

Commit

Permalink
Add support for streaming delimited messages (#529)
Browse files Browse the repository at this point in the history
* Add support for streaming delimited messages

This allows developers to easily dump and load multiple messages
from a stream in a way that is compatible with official
protobuf implementations (such as Java's
`MessageLite#writeDelimitedTo(...)`).

* Add Java compatibility tests for streaming

These tests stream data such as messages to output files, have a
Java binary read them and then write them back using the
`protobuf-java` functions, and then read them back in on the Python
side to check that the returned data is as expected. This checks
that the official Java implementation (and so any other matching
implementations) can properly parse outputs from Betterproto, and
vice-versa, ensuring compatibility in these functions between the
two.

* Replace `xxxxableBuffer` with `SupportsXxxx`
  • Loading branch information
JoshuaLeivers authored Oct 16, 2023
1 parent 6b36b9b commit 4f18ed1
Show file tree
Hide file tree
Showing 10 changed files with 537 additions and 9 deletions.
9 changes: 8 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ repos:
- repo: https://github.com/PyCQA/doc8
rev: 0.10.1
hooks:
- id: doc8
- id: doc8
additional_dependencies:
- toml

- repo: https://github.com/macisamuele/language-formatters-pre-commit-hooks
rev: v2.10.0
hooks:
- id: pretty-format-java
args: [--autofix, --aosp]
files: ^.*\.java$
33 changes: 26 additions & 7 deletions src/betterproto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@


if TYPE_CHECKING:
from _typeshed import ReadableBuffer
from _typeshed import (
SupportsRead,
SupportsWrite,
)


# Proto 3 data types
Expand Down Expand Up @@ -127,6 +130,9 @@
WIRE_FIXED_64_TYPES = [TYPE_DOUBLE, TYPE_FIXED64, TYPE_SFIXED64]
WIRE_LEN_DELIM_TYPES = [TYPE_STRING, TYPE_BYTES, TYPE_MESSAGE, TYPE_MAP]

# Indicator of message delimitation in streams
SIZE_DELIMITED = -1


# Protobuf datetimes start at the Unix Epoch in 1970 in UTC.
def datetime_default_gen() -> datetime:
Expand Down Expand Up @@ -322,7 +328,7 @@ def _pack_fmt(proto_type: str) -> str:
}[proto_type]


def dump_varint(value: int, stream: BinaryIO) -> None:
def dump_varint(value: int, stream: "SupportsWrite[bytes]") -> None:
"""Encodes a single varint and dumps it into the provided stream."""
if value < -(1 << 63):
raise ValueError(
Expand Down Expand Up @@ -531,7 +537,7 @@ def _dump_float(value: float) -> Union[float, str]:
return value


def load_varint(stream: BinaryIO) -> Tuple[int, bytes]:
def load_varint(stream: "SupportsRead[bytes]") -> Tuple[int, bytes]:
"""
Load a single varint value from a stream. Returns the value and the raw bytes read.
"""
Expand Down Expand Up @@ -569,7 +575,7 @@ class ParsedField:
raw: bytes


def load_fields(stream: BinaryIO) -> Generator[ParsedField, None, None]:
def load_fields(stream: "SupportsRead[bytes]") -> Generator[ParsedField, None, None]:
while True:
try:
num_wire, raw = load_varint(stream)
Expand Down Expand Up @@ -881,15 +887,19 @@ def _betterproto(self) -> ProtoClassMetadata:
self.__class__._betterproto_meta = meta # type: ignore
return meta

def dump(self, stream: BinaryIO) -> None:
def dump(self, stream: "SupportsWrite[bytes]", delimit: bool = False) -> None:
"""
Dumps the binary encoded Protobuf message to the stream.
Parameters
-----------
stream: :class:`BinaryIO`
The stream to dump the message to.
delimit:
Whether to prefix the message with a varint declaring its size.
"""
if delimit == SIZE_DELIMITED:
dump_varint(len(self), stream)

for field_name, meta in self._betterproto.meta_by_field_name.items():
try:
Expand Down Expand Up @@ -1207,7 +1217,11 @@ def _include_default_value_for_oneof(
meta.group is not None and self._group_current.get(meta.group) == field_name
)

def load(self: T, stream: BinaryIO, size: Optional[int] = None) -> T:
def load(
self: T,
stream: "SupportsRead[bytes]",
size: Optional[int] = None,
) -> T:
"""
Load the binary encoded Protobuf from a stream into this message instance. This
returns the instance itself and is therefore assignable and chainable.
Expand All @@ -1219,12 +1233,17 @@ def load(self: T, stream: BinaryIO, size: Optional[int] = None) -> T:
size: :class:`Optional[int]`
The size of the message in the stream.
Reads stream until EOF if ``None`` is given.
Reads based on a size delimiter prefix varint if SIZE_DELIMITED is given.
Returns
--------
:class:`Message`
The initialized message.
"""
# If the message is delimited, parse the message delimiter
if size == SIZE_DELIMITED:
size, _ = load_varint(stream)

# Got some data over the wire
self._serialized_on_wire = True
proto_meta = self._betterproto
Expand Down Expand Up @@ -1297,7 +1316,7 @@ def load(self: T, stream: BinaryIO, size: Optional[int] = None) -> T:

return self

def parse(self: T, data: "ReadableBuffer") -> T:
def parse(self: T, data: bytes) -> T:
"""
Parse the binary encoded Protobuf into this message instance. This
returns the instance itself and is therefore assignable and chainable.
Expand Down
2 changes: 2 additions & 0 deletions tests/streams/delimited_messages.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
���:bTesting���:bTesting
 
38 changes: 38 additions & 0 deletions tests/streams/java/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
### Output ###
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
dependency-reduced-pom.xml
MANIFEST.MF

### IntelliJ IDEA ###
.idea/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
94 changes: 94 additions & 0 deletions tests/streams/java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>betterproto</groupId>
<artifactId>compatibility-test</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf.version>3.23.4</protobuf.version>
</properties>

<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>

<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>betterproto.CompatibilityTest</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>betterproto.CompatibilityTest</mainClass>
</manifest>
</archive>
</configuration>
</plugin>

<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
</configuration>
</plugin>
</plugins>

<finalName>${project.artifactId}</finalName>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package betterproto;

import java.io.IOException;

public class CompatibilityTest {
public static void main(String[] args) throws IOException {
if (args.length < 2)
throw new RuntimeException("Attempted to run without the required arguments.");
else if (args.length > 2)
throw new RuntimeException(
"Attempted to run with more than the expected number of arguments (>1).");

Tests tests = new Tests(args[1]);

switch (args[0]) {
case "single_varint":
tests.testSingleVarint();
break;

case "multiple_varints":
tests.testMultipleVarints();
break;

case "single_message":
tests.testSingleMessage();
break;

case "multiple_messages":
tests.testMultipleMessages();
break;

case "infinite_messages":
tests.testInfiniteMessages();
break;

default:
throw new RuntimeException(
"Attempted to run with unknown argument '" + args[0] + "'.");
}
}
}
Loading

0 comments on commit 4f18ed1

Please sign in to comment.