diff --git a/src/libstore/remote-store-connection.hh b/src/libstore/remote-store-connection.hh index 405120ee926b..513bd6838bf7 100644 --- a/src/libstore/remote-store-connection.hh +++ b/src/libstore/remote-store-connection.hh @@ -49,7 +49,7 @@ struct RemoteStore::ConnectionHandle RemoteStore::Connection & operator * () { return *handle; } RemoteStore::Connection * operator -> () { return &*handle; } - void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true); + void processStderr(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); void withFramedSink(std::function fun); }; diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 555936c186f7..69bbc64fca39 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -153,9 +153,9 @@ RemoteStore::ConnectionHandle::~ConnectionHandle() } } -void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush) +void RemoteStore::ConnectionHandle::processStderr(Sink * sink, Source * source, bool flush, bool block) { - handle->processStderr(&daemonException, sink, source, flush); + handle->processStderr(&daemonException, sink, source, flush, block); } @@ -926,43 +926,17 @@ void RemoteStore::ConnectionHandle::withFramedSink(std::functionto.flush(); - std::exception_ptr ex; - - /* Handle log messages / exceptions from the remote on a separate - thread. */ - std::thread stderrThread([&]() - { - try { - ReceiveInterrupts receiveInterrupts; - processStderr(nullptr, nullptr, false); - } catch (...) { - ex = std::current_exception(); - } - }); - - Finally joinStderrThread([&]() { - if (stderrThread.joinable()) { - stderrThread.join(); - if (ex) { - try { - std::rethrow_exception(ex); - } catch (...) { - ignoreException(); - } - } - } - }); - - { - FramedSink sink((*this)->to, ex); + FramedSink sink((*this)->to, [&]() { + /* Periodically process stderr messages and exceptions + from the daemon. */ + processStderr(nullptr, nullptr, false, false); + }); fun(sink); sink.flush(); } - stderrThread.join(); - if (ex) - std::rethrow_exception(ex); + processStderr(nullptr, nullptr, false); } } diff --git a/src/libstore/worker-protocol-connection.cc b/src/libstore/worker-protocol-connection.cc index a47dbb689d8c..cb92ce12cd13 100644 --- a/src/libstore/worker-protocol-connection.cc +++ b/src/libstore/worker-protocol-connection.cc @@ -32,7 +32,7 @@ static Logger::Fields readFields(Source & from) return fields; } -std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush) +std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink * sink, Source * source, bool flush, bool block) { if (flush) to.flush(); @@ -41,6 +41,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink while (true) { + if (!block && !from.hasData()) + break; + auto msg = readNum(from); if (msg == STDERR_WRITE) { @@ -95,8 +98,10 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink logger->result(act, type, fields); } - else if (msg == STDERR_LAST) + else if (msg == STDERR_LAST) { + assert(block); break; + } else throw Error("got unknown message type %x from Nix daemon", msg); @@ -130,9 +135,9 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink } } -void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush) +void WorkerProto::BasicClientConnection::processStderr(bool * daemonException, Sink * sink, Source * source, bool flush, bool block) { - auto ex = processStderrReturn(sink, source, flush); + auto ex = processStderrReturn(sink, source, flush, block); if (ex) { *daemonException = true; std::rethrow_exception(ex); diff --git a/src/libstore/worker-protocol-connection.hh b/src/libstore/worker-protocol-connection.hh index 9c96195b5f4b..6a4488188d86 100644 --- a/src/libstore/worker-protocol-connection.hh +++ b/src/libstore/worker-protocol-connection.hh @@ -70,9 +70,9 @@ struct WorkerProto::BasicClientConnection : WorkerProto::BasicConnection virtual void closeWrite() = 0; - std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true); + std::exception_ptr processStderrReturn(Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); - void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true); + void processStderr(bool * daemonException, Sink * sink = 0, Source * source = 0, bool flush = true, bool block = true); /** * Establishes connection, negotiating version. diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index 4899134d7c3c..1e30d27b74a7 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -10,6 +10,8 @@ #ifdef _WIN32 # include # include "windows-error.hh" +#else +# include #endif @@ -158,6 +160,24 @@ bool FdSource::good() } +bool FdSource::hasData() +{ + if (BufferedSource::hasData()) return true; + + while (true) { + struct pollfd fds[1]; + fds[0].fd = fd; + fds[0].events = POLLIN; + auto n = poll(fds, 1, 0); + if (n < 0) { + if (errno == EINTR) continue; + throw SysError("polling file descriptor"); + } + return n == 1 && (fds[0].events & POLLIN); + } +} + + size_t StringSource::read(char * data, size_t len) { if (pos == s.size()) throw EndOfFile("end of string reached"); diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index c7290dcef9d7..964b9a30dd60 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -104,6 +104,9 @@ struct BufferedSource : Source size_t read(char * data, size_t len) override; + /** + * Return true if the buffer is not empty. + */ bool hasData(); protected: @@ -162,6 +165,13 @@ struct FdSource : BufferedSource FdSource & operator=(FdSource && s) = default; bool good() override; + + /** + * Return true if the buffer is not empty after a non-blocking + * read. + */ + bool hasData(); + protected: size_t readUnbuffered(char * data, size_t len) override; private: @@ -522,15 +532,16 @@ struct FramedSource : Source /** * Write as chunks in the format expected by FramedSource. * - * The exception_ptr reference can be used to terminate the stream when you - * detect that an error has occurred on the remote end. + * The `checkError` function can be used to terminate the stream when you + * detect that an error has occurred. */ struct FramedSink : nix::BufferedSink { BufferedSink & to; - std::exception_ptr & ex; + std::function checkError; - FramedSink(BufferedSink & to, std::exception_ptr & ex) : to(to), ex(ex) + FramedSink(BufferedSink & to, std::function && checkError) + : to(to), checkError(checkError) { } ~FramedSink() @@ -545,13 +556,9 @@ struct FramedSink : nix::BufferedSink void writeUnbuffered(std::string_view data) override { - /* Don't send more data if the remote has - encountered an error. */ - if (ex) { - auto ex2 = ex; - ex = nullptr; - std::rethrow_exception(ex2); - } + /* Don't send more data if an error has occured. */ + checkError(); + to << data.size(); to(data); };