From df07670a21a48993c8c3392446300bb915cebf0e Mon Sep 17 00:00:00 2001 From: John Ericson Date: Sun, 16 Feb 2025 18:44:26 -0500 Subject: [PATCH 1/3] Bump to newer 2.26.* Nix version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Flake lock file updates: • Updated input 'nix': 'github:NixOS/nix/970942f45836172fda410a638853382952189eb9?narHash=sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0%3D' (2025-02-12) → 'github:NixOS/nix/674a87462cb93f605d4fbeef607d3453e7e5a7d8?narHash=sha256-TBoHqnIdVWhsBcL05vO2B1hSl9m//5Mz2NU%2BPMk3h3Y%3D' (2025-02-16) --- flake.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flake.lock b/flake.lock index ec9a00e6..8b1cbf2e 100644 --- a/flake.lock +++ b/flake.lock @@ -12,11 +12,11 @@ "nixpkgs-regression": [] }, "locked": { - "lastModified": 1739393420, - "narHash": "sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0=", + "lastModified": 1739746614, + "narHash": "sha256-TBoHqnIdVWhsBcL05vO2B1hSl9m//5Mz2NU+PMk3h3Y=", "owner": "NixOS", "repo": "nix", - "rev": "970942f45836172fda410a638853382952189eb9", + "rev": "674a87462cb93f605d4fbeef607d3453e7e5a7d8", "type": "github" }, "original": { From af72b694d8f0fcca90f464e2d716bca65d100b6d Mon Sep 17 00:00:00 2001 From: John Ericson Date: Tue, 18 Feb 2025 11:51:51 -0500 Subject: [PATCH 2/3] Bump to newer 2.26.* Nix version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Needed one more thing before trying out using `LegacySSHStore` directly. Flake lock file updates: • Updated input 'nix': 'github:NixOS/nix/674a87462cb93f605d4fbeef607d3453e7e5a7d8?narHash=sha256-TBoHqnIdVWhsBcL05vO2B1hSl9m//5Mz2NU%2BPMk3h3Y%3D' (2025-02-16) → 'github:NixOS/nix/e310c19a1aeb1ce1ed4d41d5ab2d02db596e0918?narHash=sha256-q/RgA4bB7zWai4oPySq9mch7qH14IEeom2P64SXdqHs%3D' (2025-02-18) --- flake.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flake.lock b/flake.lock index 8b1cbf2e..c47a3b88 100644 --- a/flake.lock +++ b/flake.lock @@ -12,11 +12,11 @@ "nixpkgs-regression": [] }, "locked": { - "lastModified": 1739746614, - "narHash": "sha256-TBoHqnIdVWhsBcL05vO2B1hSl9m//5Mz2NU+PMk3h3Y=", + "lastModified": 1739899400, + "narHash": "sha256-q/RgA4bB7zWai4oPySq9mch7qH14IEeom2P64SXdqHs=", "owner": "NixOS", "repo": "nix", - "rev": "674a87462cb93f605d4fbeef607d3453e7e5a7d8", + "rev": "e310c19a1aeb1ce1ed4d41d5ab2d02db596e0918", "type": "github" }, "original": { From 4a4a0f901c70676ee47f830d2ff6a72789ba1baf Mon Sep 17 00:00:00 2001 From: John Ericson Date: Mon, 20 May 2024 16:22:19 -0400 Subject: [PATCH 3/3] Use `LegacySSHStore` In https://github.com/NixOS/nix/pull/10748 it is extended with everything we need. --- src/hydra-queue-runner/build-remote.cc | 178 +++++++++---------------- src/hydra-queue-runner/state.hh | 8 +- 2 files changed, 68 insertions(+), 118 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 77bde2c4..39970bd3 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -9,13 +9,10 @@ #include "path.hh" #include "legacy-ssh-store.hh" #include "serve-protocol.hh" -#include "serve-protocol-impl.hh" #include "state.hh" #include "current-process.hh" #include "processes.hh" #include "util.hh" -#include "serve-protocol.hh" -#include "serve-protocol-impl.hh" #include "ssh.hh" #include "finally.hh" #include "url.hh" @@ -39,38 +36,6 @@ bool ::Machine::isLocalhost() const namespace nix::build_remote { -static std::unique_ptr openConnection( - ::Machine::ptr machine, SSHMaster & master) -{ - Strings command = {"nix-store", "--serve", "--write"}; - if (machine->isLocalhost()) { - command.push_back("--builders"); - command.push_back(""); - } else { - auto remoteStore = machine->storeUri.params.find("remote-store"); - if (remoteStore != machine->storeUri.params.end()) { - command.push_back("--store"); - command.push_back(shellEscape(remoteStore->second)); - } - } - - auto ret = master.startCommand(std::move(command), { - "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" - }); - - // XXX: determine the actual max value we can use from /proc. - - // FIXME: Should this be upstreamed into `startCommand` in Nix? - - int pipesize = 1024 * 1024; - - fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize); - fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize); - - return ret; -} - - static void copyClosureTo( ::Machine::Connection & conn, Store & destStore, @@ -87,8 +52,8 @@ static void copyClosureTo( // FIXME: substitute output pollutes our build log /* Get back the set of paths that are already valid on the remote host. */ - auto present = conn.queryValidPaths( - destStore, true, closure, useSubstitutes); + auto present = conn.store->queryValidPaths( + closure, true, useSubstitutes); if (present.size() == closure.size()) return; @@ -103,12 +68,7 @@ static void copyClosureTo( std::unique_lock sendLock(conn.machine->state->sendLock, std::chrono::seconds(600)); - conn.to << ServeProto::Command::ImportPaths; - destStore.exportPaths(missing, conn.to); - conn.to.flush(); - - if (readInt(conn.from) != 1) - throw Error("remote machine failed to import closure"); + conn.store->addMultipleToStoreLegacy(destStore, missing); } @@ -228,7 +188,7 @@ static BuildResult performBuild( counter & nrStepsBuilding ) { - conn.putBuildDerivationRequest(localStore, drvPath, drv, options); + auto kont = conn.store->buildDerivationAsync(drvPath, drv, options); BuildResult result; @@ -237,7 +197,10 @@ static BuildResult performBuild( startTime = time(0); { MaintainCount mc(nrStepsBuilding); - result = ServeProto::Serialise::read(localStore, conn); + result = kont(); + // Without proper call-once functions, we need to manually + // delete after calling. + kont = {}; } stopTime = time(0); @@ -253,7 +216,7 @@ static BuildResult performBuild( // If the protocol was too old to give us `builtOutputs`, initialize // it manually by introspecting the derivation. - if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6) + if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6) { // If the remote is too old to handle CA derivations, we can’t get this // far anyways @@ -286,26 +249,25 @@ static void copyPathFromRemote( const ValidPathInfo & info ) { - /* Receive the NAR from the remote and add it to the - destination store. Meanwhile, extract all the info from the - NAR that getBuildOutput() needs. */ - auto source2 = sinkToSource([&](Sink & sink) - { - /* Note: we should only send the command to dump the store - path to the remote if the NAR is actually going to get read - by the destination store, which won't happen if this path - is already valid on the destination store. Since this - lambda function only gets executed if someone tries to read - from source2, we will send the command from here rather - than outside the lambda. */ - conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path); - conn.to.flush(); + /* Receive the NAR from the remote and add it to the + destination store. Meanwhile, extract all the info from the + NAR that getBuildOutput() needs. */ + auto source2 = sinkToSource([&](Sink & sink) + { + /* Note: we should only send the command to dump the store + path to the remote if the NAR is actually going to get read + by the destination store, which won't happen if this path + is already valid on the destination store. Since this + lambda function only gets executed if someone tries to read + from source2, we will send the command from here rather + than outside the lambda. */ + conn.store->narFromPath(info.path, [&](Source & source) { + TeeSource tee{source, sink}; + extractNarData(tee, conn.store->printStorePath(info.path), narMembers); + }); + }); - TeeSource tee(conn.from, sink); - extractNarData(tee, localStore.printStorePath(info.path), narMembers); - }); - - destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); + destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); } static void copyPathsFromRemote( @@ -404,30 +366,39 @@ void State::buildRemote(ref destStore, updateStep(ssConnecting); - auto storeRef = machine->completeStoreReference(); - - auto * pSpecified = std::get_if(&storeRef.variant); - if (!pSpecified || pSpecified->scheme != "ssh") { - throw Error("Currently, only (legacy-)ssh stores are supported!"); - } - - LegacySSHStoreConfig storeConfig { - pSpecified->scheme, - pSpecified->authority, - storeRef.params - }; - - auto master = storeConfig.createSSHMaster( - false, // no SSH master yet - logFD.get()); - // FIXME: rewrite to use Store. - auto child = build_remote::openConnection(machine, master); + ::Machine::Connection conn { + .machine = machine, + .store = [&]{ + auto * pSpecified = std::get_if(&machine->storeUri.variant); + if (!pSpecified || pSpecified->scheme != "ssh") { + throw Error("Currently, only (legacy-)ssh stores are supported!"); + } + + auto remoteStore = machine->openStore().dynamic_pointer_cast(); + assert(remoteStore); + + remoteStore->connPipeSize = 1024 * 1024; + + if (machine->isLocalhost()) { + auto rp_new = remoteStore->remoteProgram.get(); + rp_new.push_back("--builders"); + rp_new.push_back(""); + const_cast &>(remoteStore->remoteProgram).assign(rp_new); + } + remoteStore->extraSshArgs = { + "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" + }; + const_cast &>(remoteStore->logFD).assign(logFD.get()); + + return nix::ref{remoteStore}; + }(), + }; { auto activeStepState(activeStep->state_.lock()); if (activeStepState->cancelled) throw Error("step cancelled"); - activeStepState->pid = child->sshPid; + activeStepState->pid = conn.store->getConnectionPid(); } Finally clearPid([&]() { @@ -442,35 +413,12 @@ void State::buildRemote(ref destStore, process. Meh. */ }); - ::Machine::Connection conn { - { - .to = child->in.get(), - .from = child->out.get(), - /* Handshake. */ - .remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize - }, - /*.machine =*/ machine, - }; - Finally updateStats([&]() { - bytesReceived += conn.from.read; - bytesSent += conn.to.written; + auto stats = conn.store->getConnectionStats(); + bytesReceived += stats.bytesReceived; + bytesSent += stats.bytesSent; }); - constexpr ServeProto::Version our_version = 0x206; - - try { - conn.remoteVersion = decltype(conn)::handshake( - conn.to, - conn.from, - our_version, - machine->storeUri.render()); - } catch (EndOfFile & e) { - child->sshPid.wait(); - std::string s = chomp(readFile(result.logFile)); - throw Error("cannot connect to ‘%1%’: %2%", machine->storeUri.render(), s); - } - { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; @@ -539,7 +487,7 @@ void State::buildRemote(ref destStore, auto now1 = std::chrono::steady_clock::now(); - auto infos = conn.queryPathInfos(*localStore, outputs); + auto infos = conn.store->queryPathInfosUncached(outputs); size_t totalNarSize = 0; for (auto & [_, info] : infos) totalNarSize += info.narSize; @@ -574,9 +522,11 @@ void State::buildRemote(ref destStore, } } - /* Shut down the connection. */ - child->in = -1; - child->sshPid.wait(); + /* Shut down the connection done by RAII. + + Only difference is kill() instead of wait() (i.e. send signal + then wait()) + */ } catch (Error & e) { /* Disable this machine until a certain period of time has diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 30e01c74..e2d31434 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -20,9 +20,7 @@ #include "store-api.hh" #include "sync.hh" #include "nar-extractor.hh" -#include "serve-protocol.hh" -#include "serve-protocol-impl.hh" -#include "serve-protocol-connection.hh" +#include "legacy-ssh-store.hh" #include "machines.hh" @@ -292,9 +290,11 @@ struct Machine : nix::Machine bool isLocalhost() const; // A connection to a machine - struct Connection : nix::ServeProto::BasicClientConnection { + struct Connection { // Backpointer to the machine ptr machine; + // Opened store + nix::ref store; }; };