Skip to content

Commit

Permalink
Factor out commonality between WorkerProto::Basic{Client,Server}Conne…
Browse files Browse the repository at this point in the history
…ction

This also renames clientVersion and daemonVersion to the more correct
protoVersion (since it's the version agreed to by both sides).
  • Loading branch information
edolstra committed Jul 17, 2024
1 parent 464e592 commit ff96f7d
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 129 deletions.
29 changes: 14 additions & 15 deletions src/libstore/daemon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,8 @@ static void performOp(TunnelLogger * logger, ref<Store> store,

void processConnection(
ref<Store> store,
FdSource & from,
FdSink & to,
FdSource && from,
FdSink && to,
TrustedFlag trusted,
RecursiveFlag recursive)
{
Expand All @@ -1037,7 +1037,12 @@ void processConnection(
if (clientVersion < 0x10a)
throw Error("the Nix client version is too old");

auto tunnelLogger = new TunnelLogger(to, clientVersion);
WorkerProto::BasicServerConnection conn;
conn.to = std::move(to);
conn.from = std::move(from);
conn.protoVersion = clientVersion;

auto tunnelLogger = new TunnelLogger(conn.to, clientVersion);
auto prevLogger = nix::logger;
// FIXME
if (!recursive)
Expand All @@ -1050,12 +1055,6 @@ void processConnection(
printMsgUsing(prevLogger, lvlDebug, "%d operations", opCount);
});

WorkerProto::BasicServerConnection conn {
.to = to,
.from = from,
.clientVersion = clientVersion,
};

conn.postHandshake(*store, {
.daemonNixVersion = nixVersion,
// We and the underlying store both need to trust the client for
Expand All @@ -1071,13 +1070,13 @@ void processConnection(
try {

tunnelLogger->stopWork();
to.flush();
conn.to.flush();

/* Process client requests. */
while (true) {
WorkerProto::Op op;
try {
op = (enum WorkerProto::Op) readInt(from);
op = (enum WorkerProto::Op) readInt(conn.from);
} catch (Interrupted & e) {
break;
} catch (EndOfFile & e) {
Expand All @@ -1091,7 +1090,7 @@ void processConnection(
debug("performing daemon worker op: %d", op);

try {
performOp(tunnelLogger, store, trusted, recursive, clientVersion, from, to, op);
performOp(tunnelLogger, store, trusted, recursive, clientVersion, conn.from, conn.to, op);
} catch (Error & e) {
/* If we're not in a state where we can send replies, then
something went wrong processing the input of the
Expand All @@ -1107,19 +1106,19 @@ void processConnection(
throw;
}

to.flush();
conn.to.flush();

assert(!tunnelLogger->state_.lock()->canSendStderr);
};

} catch (Error & e) {
tunnelLogger->stopWork(&e);
to.flush();
conn.to.flush();
return;
} catch (std::exception & e) {
auto ex = Error(e.what());
tunnelLogger->stopWork(&ex);
to.flush();
conn.to.flush();
return;
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/libstore/daemon.hh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ enum RecursiveFlag : bool { NotRecursive = false, Recursive = true };

void processConnection(
ref<Store> store,
FdSource & from,
FdSink & to,
FdSource && from,
FdSink && to,
TrustedFlag trusted,
RecursiveFlag recursive);

Expand Down
38 changes: 19 additions & 19 deletions src/libstore/remote-store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void RemoteStore::initConnection(Connection & conn)
StringSink saved;
TeeSource tee(conn.from, saved);
try {
conn.daemonVersion = WorkerProto::BasicClientConnection::handshake(
conn.protoVersion = WorkerProto::BasicClientConnection::handshake(
conn.to, tee, PROTOCOL_VERSION);
} catch (SerialisationError & e) {
/* In case the other side is waiting for our input, close
Expand Down Expand Up @@ -115,7 +115,7 @@ void RemoteStore::setOptions(Connection & conn)
<< settings.buildCores
<< settings.useSubstitutes;

if (GET_PROTOCOL_MINOR(conn.daemonVersion) >= 12) {
if (GET_PROTOCOL_MINOR(conn.protoVersion) >= 12) {
std::map<std::string, Config::SettingInfo> overrides;
settings.getSettings(overrides, true); // libstore settings
fileTransferSettings.getSettings(overrides, true);
Expand Down Expand Up @@ -175,7 +175,7 @@ bool RemoteStore::isValidPathUncached(const StorePath & path)
StorePathSet RemoteStore::queryValidPaths(const StorePathSet & paths, SubstituteFlag maybeSubstitute)
{
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) {
StorePathSet res;
for (auto & i : paths)
if (isValidPath(i)) res.insert(i);
Expand All @@ -198,7 +198,7 @@ StorePathSet RemoteStore::queryAllValidPaths()
StorePathSet RemoteStore::querySubstitutablePaths(const StorePathSet & paths)
{
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) {
StorePathSet res;
for (auto & i : paths) {
conn->to << WorkerProto::Op::HasSubstitutes << printStorePath(i);
Expand All @@ -221,7 +221,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S

auto conn(getConnection());

if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 12) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 12) {

for (auto & i : pathsMap) {
SubstitutablePathInfo info;
Expand All @@ -241,7 +241,7 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
} else {

conn->to << WorkerProto::Op::QuerySubstitutablePathInfos;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 22) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 22) {
StorePathSet paths;
for (auto & path : pathsMap)
paths.insert(path.first);
Expand Down Expand Up @@ -368,7 +368,7 @@ ref<const ValidPathInfo> RemoteStore::addCAToStore(
std::optional<ConnectionHandle> conn_(getConnection());
auto & conn = *conn_;

if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 25) {

conn->to
<< WorkerProto::Op::AddToStore
Expand Down Expand Up @@ -485,7 +485,7 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
{
auto conn(getConnection());

if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 18) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 18) {
auto source2 = sinkToSource([&](Sink & sink) {
sink << 1 // == path follows
;
Expand Down Expand Up @@ -513,11 +513,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
<< info.ultimate << info.sigs << renderContentAddress(info.ca)
<< repair << !checkSigs;

if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 23) {
conn.withFramedSink([&](Sink & sink) {
copyNAR(source, sink);
});
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
} else if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 21) {
conn.processStderr(0, &source);
} else {
copyNAR(source, conn->to);
Expand Down Expand Up @@ -554,7 +554,7 @@ void RemoteStore::addMultipleToStore(
RepairFlag repair,
CheckSigsFlag checkSigs)
{
if (GET_PROTOCOL_MINOR(getConnection()->daemonVersion) >= 32) {
if (GET_PROTOCOL_MINOR(getConnection()->protoVersion) >= 32) {
auto conn(getConnection());
conn->to
<< WorkerProto::Op::AddMultipleToStore
Expand All @@ -572,7 +572,7 @@ void RemoteStore::registerDrvOutput(const Realisation & info)
{
auto conn(getConnection());
conn->to << WorkerProto::Op::RegisterDrvOutput;
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) {
conn->to << info.id.to_string();
conn->to << std::string(info.outPath.to_string());
} else {
Expand All @@ -587,7 +587,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id,
try {
auto conn(getConnection());

if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 27) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 27) {
warn("the daemon is too old to support content-addressed derivations, please upgrade it to 2.4");
return callback(nullptr);
}
Expand All @@ -597,7 +597,7 @@ void RemoteStore::queryRealisationUncached(const DrvOutput & id,
conn.processStderr();

auto real = [&]() -> std::shared_ptr<const Realisation> {
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 31) {
auto outPaths = WorkerProto::Serialise<std::set<StorePath>>::read(
*this, *conn);
if (outPaths.empty())
Expand Down Expand Up @@ -644,9 +644,9 @@ void RemoteStore::buildPaths(const std::vector<DerivedPath> & drvPaths, BuildMod

auto conn(getConnection());
conn->to << WorkerProto::Op::BuildPaths;
assert(GET_PROTOCOL_MINOR(conn->daemonVersion) >= 13);
assert(GET_PROTOCOL_MINOR(conn->protoVersion) >= 13);
WorkerProto::write(*this, *conn, drvPaths);
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 15)
if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 15)
conn->to << buildMode;
else
/* Old daemons did not take a 'buildMode' parameter, so we
Expand All @@ -667,7 +667,7 @@ std::vector<KeyedBuildResult> RemoteStore::buildPathsWithResults(
std::optional<ConnectionHandle> conn_(getConnection());
auto & conn = *conn_;

if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 34) {
if (GET_PROTOCOL_MINOR(conn->protoVersion) >= 34) {
conn->to << WorkerProto::Op::BuildPathsWithResults;
WorkerProto::write(*this, *conn, paths);
conn->to << buildMode;
Expand Down Expand Up @@ -841,7 +841,7 @@ void RemoteStore::queryMissing(const std::vector<DerivedPath> & targets,
{
{
auto conn(getConnection());
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 19)
if (GET_PROTOCOL_MINOR(conn->protoVersion) < 19)
// Don't hold the connection handle in the fallback case
// to prevent a deadlock.
goto fallback;
Expand Down Expand Up @@ -889,7 +889,7 @@ void RemoteStore::connect()
unsigned int RemoteStore::getProtocol()
{
auto conn(connections->get());
return conn->daemonVersion;
return conn->protoVersion;
}

std::optional<TrustedFlag> RemoteStore::isTrustedClient()
Expand Down
7 changes: 4 additions & 3 deletions src/libstore/unix/build/local-derivation-goal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1526,10 +1526,11 @@ void LocalDerivationGoal::startDaemon()
debug("received daemon connection");

auto workerThread = std::thread([store, remote{std::move(remote)}]() {
FdSource from(remote.get());
FdSink to(remote.get());
try {
daemon::processConnection(store, from, to,
daemon::processConnection(
store,
FdSource(remote.get()),
FdSink(remote.get()),
NotTrusted, daemon::Recursive);
debug("terminated daemon connection");
} catch (SystemError &) {
Expand Down
20 changes: 10 additions & 10 deletions src/libstore/worker-protocol-connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
}

else if (msg == STDERR_ERROR) {
if (GET_PROTOCOL_MINOR(daemonVersion) >= 26) {
if (GET_PROTOCOL_MINOR(protoVersion) >= 26) {
ex = std::make_exception_ptr(readError(from));
} else {
auto error = readString(from);
Expand Down Expand Up @@ -114,7 +114,7 @@ std::exception_ptr WorkerProto::BasicClientConnection::processStderrReturn(Sink
// explain to users what's going on when their daemon is
// older than #4628 (2023).
if (experimentalFeatureSettings.isEnabled(Xp::DynamicDerivations)
&& GET_PROTOCOL_MINOR(daemonVersion) <= 35) {
&& GET_PROTOCOL_MINOR(protoVersion) <= 35) {
auto m = e.msg();
if (m.find("parsing derivation") != std::string::npos && m.find("expected string") != std::string::npos
&& m.find("Derive([") != std::string::npos)
Expand Down Expand Up @@ -173,28 +173,28 @@ WorkerProto::ClientHandshakeInfo WorkerProto::BasicClientConnection::postHandsha
{
WorkerProto::ClientHandshakeInfo res;

if (GET_PROTOCOL_MINOR(daemonVersion) >= 14) {
if (GET_PROTOCOL_MINOR(protoVersion) >= 14) {
// Obsolete CPU affinity.
to << 0;
}

if (GET_PROTOCOL_MINOR(daemonVersion) >= 11)
if (GET_PROTOCOL_MINOR(protoVersion) >= 11)
to << false; // obsolete reserveSpace

if (GET_PROTOCOL_MINOR(daemonVersion) >= 33)
if (GET_PROTOCOL_MINOR(protoVersion) >= 33)
to.flush();

return WorkerProto::Serialise<ClientHandshakeInfo>::read(store, *this);
}

void WorkerProto::BasicServerConnection::postHandshake(const StoreDirConfig & store, const ClientHandshakeInfo & info)
{
if (GET_PROTOCOL_MINOR(clientVersion) >= 14 && readInt(from)) {
if (GET_PROTOCOL_MINOR(protoVersion) >= 14 && readInt(from)) {
// Obsolete CPU affinity.
readInt(from);
}

if (GET_PROTOCOL_MINOR(clientVersion) >= 11)
if (GET_PROTOCOL_MINOR(protoVersion) >= 11)
readInt(from); // obsolete reserveSpace

WorkerProto::write(store, *this, info);
Expand All @@ -212,7 +212,7 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo(
throw InvalidPath(std::move(e.info()));
throw;
}
if (GET_PROTOCOL_MINOR(daemonVersion) >= 17) {
if (GET_PROTOCOL_MINOR(protoVersion) >= 17) {
bool valid;
from >> valid;
if (!valid)
Expand All @@ -224,10 +224,10 @@ UnkeyedValidPathInfo WorkerProto::BasicClientConnection::queryPathInfo(
StorePathSet WorkerProto::BasicClientConnection::queryValidPaths(
const StoreDirConfig & store, bool * daemonException, const StorePathSet & paths, SubstituteFlag maybeSubstitute)
{
assert(GET_PROTOCOL_MINOR(daemonVersion) >= 12);
assert(GET_PROTOCOL_MINOR(protoVersion) >= 12);
to << WorkerProto::Op::QueryValidPaths;
WorkerProto::write(store, *this, paths);
if (GET_PROTOCOL_MINOR(daemonVersion) >= 27) {
if (GET_PROTOCOL_MINOR(protoVersion) >= 27) {
to << maybeSubstitute;
}
processStderr(daemonException);
Expand Down
Loading

0 comments on commit ff96f7d

Please sign in to comment.