Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support streaming echo messages #435

Merged
merged 40 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d58bf61
First step streaming echo
richardapeters Sep 28, 2023
a438d63
Step two implementing streaming Echo, adjust tests
richardapeters Oct 12, 2023
38a6ca2
protobuf/echo/Echo: Release proxy after writing all data
richardapeters Oct 12, 2023
1c20294
protobuf/echo/ProtoMessageSender: Use the error policy of output in P…
richardapeters Oct 12, 2023
cc021b4
services/util/EchoOnMessageCommunicationSymmetricKey: Delay setting k…
richardapeters Oct 13, 2023
06ea841
Implement protobuf/echo/ServiceForwarder
richardapeters Oct 13, 2023
28837ad
Apply clang-format
richardapeters Oct 13, 2023
0e25cec
ProtoCEchoPlugin: Make serviceId, id, and maxMessageSize constexpr
richardapeters Oct 13, 2023
d12766d
Implement TracngEchoOnConnection
richardapeters Oct 14, 2023
789452f
Update services/network/TracingEchoOnConnection.hpp
richardapeters Oct 14, 2023
797ed46
Apply Sonar review comments
richardapeters Oct 15, 2023
53ad3c8
Merge branch 'feature/streaming-echo' of https://github.com/philips-s…
richardapeters Oct 15, 2023
eaf6211
protobuf/echo/Echo: Introduce MethodDeserializerFactory
richardapeters Oct 16, 2023
f8e60ce
protobuf/echo/Echo.hpp: Extract EchoErrorPolicy and Serialization
richardapeters Oct 16, 2023
2a6d1e6
Add MethodDeserializerFactory::ForServices
richardapeters Oct 17, 2023
c84dc2d
protobuf/echo/Serialization: Extract structs with partial specializat…
richardapeters Oct 17, 2023
2505c23
protobuf/echo/Serialization: Fix compilation for GCC
richardapeters Oct 17, 2023
f1c65c9
protobuf/echo/test_doubles/ServiceStub: Fix compilation for GCC
richardapeters Oct 17, 2023
f43050b
protobuf/protoc_echo_plugin/ProtoCEchoPlugin: Fix compilation for GCC
richardapeters Oct 17, 2023
aa91ca8
protobuf/echo/Serialization: Fix compilation for GCC
richardapeters Oct 17, 2023
f7c6f44
protobuf/echo/Serialization: Replace MakeSharedOnHeap<MethodDeseriali…
richardapeters Oct 25, 2023
b31c3db
protobuf/echo/Echo: Remove ServiceProxyResponseQueue
richardapeters Oct 25, 2023
13768b7
protobuf/echo/Serialization: Replace MakeSharedOnHeap<MethodSerialize…
richardapeters Oct 25, 2023
1942bbc
protobuf/echo/Serialization: Fix declaration order
richardapeters Oct 25, 2023
36f773a
Merge remote-tracking branch 'origin/main' into feature/streaming-echo
richardapeters Oct 26, 2023
3408e24
build: only build and install protobuf echo compilers when EMIL_BUILD…
richardapeters Oct 26, 2023
29ac96d
Merge branch 'feature/install-protobuf-echo' into feature/streaming-echo
richardapeters Oct 26, 2023
ee10828
protobuf/protoc_echo_plugin*/CMakeLists: Fix emil_build_for statement
richardapeters Oct 26, 2023
9a652d5
Merge remote-tracking branch 'origin/feature/install-protobuf-echo' i…
richardapeters Oct 26, 2023
e44104c
Install export file, but only whene EMIL_BUILD_ECHO_COMPILERS
richardapeters Oct 26, 2023
da69a14
Merge remote-tracking branch 'origin/feature/install-protobuf-echo' i…
richardapeters Oct 26, 2023
723aa92
Resolve Sonar warnings
richardapeters Oct 26, 2023
b75707b
Stream empty parameters
richardapeters Oct 26, 2023
2f5b814
protobuf/echo/Echo: Move MethodSerializerFactory to Echo
richardapeters Oct 26, 2023
a21fb6a
Update protobuf/echo/Serialization.hpp
richardapeters Oct 26, 2023
b929074
services/util/EchoInstantiation: Add MethodSerializationFactory
richardapeters Oct 26, 2023
81625ec
Merge remote-tracking branch 'origin/main' into feature/streaming-echo
richardapeters Nov 9, 2023
514aaea
protobuf/echo/Echo: Reserve size for service and method id
richardapeters Nov 9, 2023
920c7d6
protobuf/echo/EchoErrorPolicy: Make destructor virtual
richardapeters Nov 9, 2023
1da6d3b
Apply review comments
richardapeters Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions examples/rpc/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ class Console
: public examples::Console
{
public:
Console(services::Echo& echo)
: examples::Console(echo)
Console(services::Echo& echo, services::MethodSerializerFactory& serializerFactory)
: examples::Console(echo, serializerFactory)
{}

void Write(infra::BoundedConstString message) override
Expand All @@ -23,8 +23,8 @@ class EchoConnection
: public services::EchoOnConnection
{
public:
EchoConnection()
: console(*this)
EchoConnection(services::MethodSerializerFactory& serializerFactory)
: console(*this, serializerFactory)
{}

private:
Expand All @@ -45,12 +45,13 @@ class RpcServer
connection->::services::ConnectionObserver::Subject().AbortAndDestroy();

if (connection.Allocatable())
createdObserver(connection.Emplace());
createdObserver(connection.Emplace(serializerFactory));
}

private:
infra::SharedPtr<void> listener;
infra::SharedOptional<EchoConnection> connection;
services::MethodSerializerFactory::OnHeap serializerFactory;
};

int main(int argc, const char* argv[], const char* env[])
Expand Down
4 changes: 3 additions & 1 deletion infra/stream/BufferingStreamReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ namespace infra
return from;
}

return input.ExtractContiguousRange(max);
auto result = input.ExtractContiguousRange(max);
index += result.size();
return result;
}

infra::ConstByteRange BufferingStreamReader::PeekContiguousRange(std::size_t start)
Expand Down
30 changes: 20 additions & 10 deletions infra/stream/LimitedInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace infra
}

LimitedStreamReaderWithRewinding::LimitedStreamReaderWithRewinding(StreamReaderWithRewinding& input, uint32_t length)
: input(input)
: input(&input)
, length(length)
{}

Expand All @@ -72,55 +72,65 @@ namespace infra
length = newLength;
}

bool LimitedStreamReaderWithRewinding::LimitReached() const
{
return length == 0;
}

void LimitedStreamReaderWithRewinding::SwitchInput(StreamReaderWithRewinding& newInput)
{
this->input = &newInput;
}

void LimitedStreamReaderWithRewinding::Extract(ByteRange range, StreamErrorPolicy& errorPolicy)
{
errorPolicy.ReportResult(length >= range.size());
range.shrink_from_back_to(length);
length -= range.size();
input.Extract(range, errorPolicy);
input->Extract(range, errorPolicy);
}

uint8_t LimitedStreamReaderWithRewinding::Peek(StreamErrorPolicy& errorPolicy)
{
errorPolicy.ReportResult(length != 0);

if (length != 0)
return input.Peek(errorPolicy);
return input->Peek(errorPolicy);
else
return 0;
}

ConstByteRange LimitedStreamReaderWithRewinding::ExtractContiguousRange(std::size_t max)
{
ConstByteRange result = input.ExtractContiguousRange(std::min<std::size_t>(length, max));
ConstByteRange result = input->ExtractContiguousRange(std::min<std::size_t>(length, max));
length -= result.size();
return result;
}

ConstByteRange infra::LimitedStreamReaderWithRewinding::PeekContiguousRange(std::size_t start)
{
return input.PeekContiguousRange(start);
return input->PeekContiguousRange(start);
}

bool LimitedStreamReaderWithRewinding::Empty() const
{
return length == 0 || input.Empty();
return length == 0 || input->Empty();
}

std::size_t LimitedStreamReaderWithRewinding::Available() const
{
return std::min<uint32_t>(length, input.Available());
return std::min<std::size_t>(length, input->Available());
}

std::size_t LimitedStreamReaderWithRewinding::ConstructSaveMarker() const
{
return input.ConstructSaveMarker();
return input->ConstructSaveMarker();
}

void LimitedStreamReaderWithRewinding::Rewind(std::size_t marker)
{
auto now = input.ConstructSaveMarker();
input.Rewind(marker);
auto now = input->ConstructSaveMarker();
input->Rewind(marker);

length += now - marker;
}
Expand Down
4 changes: 3 additions & 1 deletion infra/stream/LimitedInputStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ namespace infra
~LimitedStreamReaderWithRewinding() = default;

void ResetLength(uint32_t newLength);
bool LimitReached() const;
void SwitchInput(StreamReaderWithRewinding& newInput);

public:
void Extract(ByteRange range, StreamErrorPolicy& errorPolicy) override;
Expand All @@ -56,7 +58,7 @@ namespace infra
void Rewind(std::size_t marker) override;

private:
StreamReaderWithRewinding& input;
StreamReaderWithRewinding* input;
uint32_t length;
};

Expand Down
5 changes: 5 additions & 0 deletions infra/stream/OutputStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ namespace infra
virtual void Insert(ConstByteRange range, StreamErrorPolicy& errorPolicy) = 0;
virtual std::size_t Available() const = 0;

bool Empty() const
{
return Available() == 0;
}

virtual std::size_t ConstructSaveMarker() const;
virtual std::size_t GetProcessedBytesSince(std::size_t marker) const;
virtual infra::ByteRange SaveState(std::size_t marker);
Expand Down
7 changes: 6 additions & 1 deletion infra/syntax/ProtoParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,19 @@ namespace infra
uint64_t result = 0;
uint8_t byte = 0;
uint8_t shift = 0;
uint8_t index = 0;

do
{
input >> byte;

result += static_cast<uint64_t>(byte & 0x7f) << shift;
shift += 7;
} while (!input.Failed() && (byte & 0x80) != 0);
++index;
} while (!input.Failed() && (byte & 0x80) != 0 && index != 10);

if (!input.Failed() && (byte & 0x80) != 0 && index == 10)
formatErrorPolicy.Failed();

return result;
}
Expand Down
4 changes: 4 additions & 0 deletions protobuf/echo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ target_link_libraries(protobuf.echo PUBLIC
target_sources(protobuf.echo PRIVATE
Echo.cpp
Echo.hpp
EchoErrorPolicy.cpp
EchoErrorPolicy.hpp
Proto.hpp
ProtoMessageReceiver.cpp
ProtoMessageReceiver.hpp
ProtoMessageSender.cpp
ProtoMessageSender.hpp
Serialization.cpp
Serialization.hpp
ServiceForwarder.cpp
ServiceForwarder.hpp
TracingEcho.cpp
Expand Down
Loading