From 022160809b5040bbf47730ee2d0fc1734e379f31 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 13 Apr 2022 14:01:23 +0200 Subject: [PATCH] buildRemote(): Support arbitrary stores Rather than re-implementing the legacy SSH store protocol, we now use Store directly. To be able to support build cancellation, the actual build is performed by a helper process (hydra-build-step), which can be killed by hydra-queue-runner if the corresponding step is cancelled. --- configure.ac | 1 + flake.nix | 2 +- src/Makefile.am | 2 +- src/hydra-build-step/Makefile.am | 5 + src/hydra-build-step/hydra-build-step.cc | 208 +++++++++ src/hydra-queue-runner/build-remote.cc | 463 +++---------------- src/hydra-queue-runner/hydra-queue-runner.cc | 5 +- src/hydra-queue-runner/state.hh | 5 +- 8 files changed, 283 insertions(+), 408 deletions(-) create mode 100644 src/hydra-build-step/Makefile.am create mode 100644 src/hydra-build-step/hydra-build-step.cc diff --git a/configure.ac b/configure.ac index 0c823696..bc0fc333 100644 --- a/configure.ac +++ b/configure.ac @@ -68,6 +68,7 @@ AC_CONFIG_FILES([ src/hydra-evaluator/Makefile src/hydra-eval-jobs/Makefile src/hydra-queue-runner/Makefile + src/hydra-build-step/Makefile src/sql/Makefile src/ttf/Makefile src/lib/Makefile diff --git a/flake.nix b/flake.nix index 01b0c988..a4c051aa 100644 --- a/flake.nix +++ b/flake.nix @@ -619,7 +619,7 @@ shellHook = '' pushd $(git rev-parse --show-toplevel) >/dev/null - PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-eval-jobs:$(pwd)/src/hydra-queue-runner:$PATH + PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-eval-jobs:$(pwd)/src/hydra-queue-runner:$(pwd)/src/hydra-build-step:$PATH PERL5LIB=$(pwd)/src/lib:$PERL5LIB export HYDRA_HOME="$(pwd)/src/" mkdir -p .hydra-data diff --git a/src/Makefile.am b/src/Makefile.am index a28780b6..0dec59c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = hydra-evaluator hydra-eval-jobs hydra-queue-runner sql script lib root ttf +SUBDIRS = hydra-evaluator hydra-eval-jobs hydra-queue-runner hydra-build-step sql script lib root ttf BOOTCLEAN_SUBDIRS = $(SUBDIRS) DIST_SUBDIRS = $(SUBDIRS) diff --git a/src/hydra-build-step/Makefile.am b/src/hydra-build-step/Makefile.am new file mode 100644 index 00000000..d8ba9659 --- /dev/null +++ b/src/hydra-build-step/Makefile.am @@ -0,0 +1,5 @@ +bin_PROGRAMS = hydra-build-step + +hydra_build_step_SOURCES = hydra-build-step.cc +hydra_build_step_LDADD = $(NIX_LIBS) +hydra_build_step_CXXFLAGS = $(NIX_CFLAGS) diff --git a/src/hydra-build-step/hydra-build-step.cc b/src/hydra-build-step/hydra-build-step.cc new file mode 100644 index 00000000..064e7f97 --- /dev/null +++ b/src/hydra-build-step/hydra-build-step.cc @@ -0,0 +1,208 @@ +/* This is a helper program that performs a build step, i.e. a single + derivation. In addition to a derivation path, it takes three store + URLs as arguments: + + * --store: The store that will hold the resulting store paths + (typically a binary cache). + + * --eval-store: The store that holds the .drv files, as produced by + hydra-evaluator. + + * --build-store: The store that performs the build (often a + SSHStore for remote builds). + + The build log is written to the path indicated by --log-file. +*/ + +#include "shared.hh" +#include "common-eval-args.hh" +#include "store-api.hh" +#include "build-result.hh" +#include "derivations.hh" +#include "worker-protocol.hh" + +#include + +using namespace nix; + +// FIXME: cut&paste +static std::string_view getS(const std::vector & fields, size_t n) +{ + assert(n < fields.size()); + assert(fields[n].type == Logger::Field::tString); + return fields[n].s; +} + +void mainWrapped(std::list args) +{ + verbosity = lvlError; + + struct MyArgs : MixEvalArgs, MixCommonArgs + { + Path drvPath; + std::optional buildStoreUrl; + std::optional logPath; + std::optional maxOutputSize; + + MyArgs() : MixCommonArgs("hydra-build-step") + { + expectArg("drv-path", &drvPath); + + addFlag({ + .longName = "build-store", + .description = "The Nix store to use for building the derivation.", + //.category = category, + .labels = {"store-url"}, + .handler = {&buildStoreUrl}, + }); + + addFlag({ + .longName = "log-file", + .description = "The path to the build log.", + .labels = {"path"}, + .handler = {&logPath}, + }); + + addFlag({ + .longName = "max-output-size", + .description = "Maximum size of the outputs.", + .labels = {"bytes"}, + .handler = {&maxOutputSize}, + }); + } + }; + + /* A logger that intercepts all build log lines and writes them to + the log file. */ + MyArgs myArgs; + myArgs.parseCmdline(args); + + struct MyLogger : public Logger + { + Logger & prev; + AutoCloseFD logFile; + + MyLogger(Logger & prev, Path logPath) : prev(prev) + { + logFile = open(logPath.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); + if (!logFile) + throw SysError("creating log file '%s'", logPath); + } + + void log(Verbosity lvl, const FormatOrString & fs) override + { prev.log(lvl, fs); } + + void logEI(const ErrorInfo & ei) override + { prev.logEI(ei); } + + void writeToStdout(std::string_view s) override + { prev.writeToStdout(s); } + + void result(ActivityId act, ResultType type, const Fields & fields) override + { + if (type == resBuildLogLine) + writeLine(logFile.get(), std::string(getS(fields, 0))); + else + prev.result(act, type, fields); + } + }; + + auto destStore = openStore(); + auto evalStore = myArgs.evalStoreUrl ? openStore(*myArgs.evalStoreUrl) : destStore; + auto buildStore = myArgs.buildStoreUrl ? openStore(*myArgs.buildStoreUrl) : destStore; + + auto drvPath = evalStore->parseStorePath(myArgs.drvPath); + + auto drv = evalStore->readDerivation(drvPath); + BasicDerivation basicDrv(drv); + + uint64_t overhead = 0; + + /* Gather the inputs. */ + StorePathSet inputs; + + for (auto & p : drv.inputSrcs) + inputs.insert(p); + + for (auto & input : drv.inputDrvs) { + auto drv2 = evalStore->readDerivation(input.first); + for (auto & name : input.second) { + if (auto i = get(drv2.outputs, name)) { + auto outPath = i->path(*evalStore, drv2.name, name); + inputs.insert(*outPath); + basicDrv.inputSrcs.insert(*outPath); + } + } + } + + /* Ensure that the inputs exist in the destination store (so that + the builder can substitute them from the destination + store). This is a no-op for regular stores, but for the binary + cache store, this will copy the inputs to the binary cache from + the local store. */ + { + auto now1 = std::chrono::steady_clock::now(); + + debug("sending closure of '%s' to '%s'", + evalStore->printStorePath(drvPath), destStore->getUri()); + + if (evalStore != destStore) + copyClosure(*evalStore, *destStore, drv.inputSrcs, NoRepair, NoCheckSigs); + + copyClosure(*destStore, *buildStore, inputs, NoRepair, NoCheckSigs, Substitute); + + auto now2 = std::chrono::steady_clock::now(); + + overhead += std::chrono::duration_cast(now2 - now1).count(); + } + + /* Perform the build. */ + if (myArgs.logPath) + logger = new MyLogger(*logger, *myArgs.logPath); + + auto buildResult = buildStore->buildDerivation(drvPath, basicDrv); + + /* Copy the output paths from the build store to the destination + store. */ + size_t totalNarSize = 0; + + if (buildResult.success()) { + + std::map infos; + StorePathSet outputs; + for (auto & [output, realisation] : buildResult.builtOutputs) { + auto info = buildStore->queryPathInfo(realisation.outPath); + totalNarSize += info->narSize; + infos.insert_or_assign(info->path, *info); + outputs.insert(info->path); + } + + if ((!myArgs.maxOutputSize || totalNarSize <= *myArgs.maxOutputSize) + && buildStore != destStore) + { + debug("copying outputs of '%s' from '%s' (%d bytes)", + buildStore->printStorePath(drvPath), buildStore->getUri(), totalNarSize); + + auto now1 = std::chrono::steady_clock::now(); + + copyPaths(*buildStore, *destStore, outputs, NoRepair, NoCheckSigs); + + auto now2 = std::chrono::steady_clock::now(); + + overhead += std::chrono::duration_cast(now2 - now1).count(); + } + } + + FdSink stdout(STDOUT_FILENO); + stdout << overhead; + stdout << totalNarSize; + worker_proto::write(*evalStore, stdout, buildResult); +} + +int main(int argc, char * * argv) +{ + return handleExceptions(argv[0], [&]() { + initNix(); + mainWrapped(argvToStrings(argc, argv)); + }); +} diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 57a5f0df..02efdb78 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -1,180 +1,24 @@ -#include -#include - -#include -#include -#include - #include "build-result.hh" -#include "serve-protocol.hh" #include "state.hh" #include "util.hh" -#include "worker-protocol.hh" #include "finally.hh" #include "url.hh" +#include "worker-protocol.hh" using namespace nix; - -struct Child +static std::string machineToStoreUrl(Machine::ptr machine) { - Pid pid; - AutoCloseFD to, from; -}; + if (machine->sshName == "localhost") + return "auto"; + // FIXME: remove this, rely on Machine::Machine(), Machine::openStore(). -static void append(Strings & dst, const Strings & src) -{ - dst.insert(dst.end(), src.begin(), src.end()); + // SSH flags: "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" + + return "ssh://" + machine->sshName; } -static Strings extraStoreArgs(std::string & machine) -{ - Strings result; - try { - auto parsed = parseURL(machine); - if (parsed.scheme != "ssh") { - throw SysError("Currently, only (legacy-)ssh stores are supported!"); - } - machine = parsed.authority.value_or(""); - auto remoteStore = parsed.query.find("remote-store"); - if (remoteStore != parsed.query.end()) { - result = {"--store", shellEscape(remoteStore->second)}; - } - } catch (BadURL &) { - // We just try to continue with `machine->sshName` here for backwards compat. - } - - return result; -} - -static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Child & child) -{ - std::string pgmName; - Pipe to, from; - to.create(); - from.create(); - - Strings argv; - if (machine->isLocalhost()) { - pgmName = "nix-store"; - argv = {"nix-store", "--builders", "", "--serve", "--write"}; - } else { - pgmName = "ssh"; - auto sshName = machine->sshName; - Strings extraArgs = extraStoreArgs(sshName); - argv = {"ssh", sshName}; - if (machine->sshKey != "") append(argv, {"-i", machine->sshKey}); - if (machine->sshPublicHostKey != "") { - Path fileName = tmpDir + "/host-key"; - auto p = machine->sshName.find("@"); - std::string host = p != std::string::npos ? std::string(machine->sshName, p + 1) : machine->sshName; - writeFile(fileName, host + " " + machine->sshPublicHostKey + "\n"); - append(argv, {"-oUserKnownHostsFile=" + fileName}); - } - append(argv, - { "-x", "-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes" - , "--", "nix-store", "--serve", "--write" }); - append(argv, extraArgs); - } - - child.pid = startProcess([&]() { - restoreProcessContext(); - - if (dup2(to.readSide.get(), STDIN_FILENO) == -1) - throw SysError("cannot dup input pipe to stdin"); - - if (dup2(from.writeSide.get(), STDOUT_FILENO) == -1) - throw SysError("cannot dup output pipe to stdout"); - - if (dup2(stderrFD, STDERR_FILENO) == -1) - throw SysError("cannot dup stderr"); - - execvp(argv.front().c_str(), (char * *) stringsToCharPtrs(argv).data()); // FIXME: remove cast - - throw SysError("cannot start %s", pgmName); - }); - - to.readSide = -1; - from.writeSide = -1; - - child.to = to.writeSide.release(); - child.from = from.readSide.release(); -} - - -static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, - FdSource & from, FdSink & to, const StorePathSet & paths, - bool useSubstitutes = false) -{ - StorePathSet closure; - destStore.computeFSClosure(paths, closure); - - /* Send the "query valid paths" command with the "lock" option - enabled. This prevents a race where the remote host - garbage-collect paths that are already there. Optionally, ask - the remote host to substitute missing paths. */ - // FIXME: substitute output pollutes our build log - to << cmdQueryValidPaths << 1 << useSubstitutes; - worker_proto::write(destStore, to, closure); - to.flush(); - - /* Get back the set of paths that are already valid on the remote - host. */ - auto present = worker_proto::read(destStore, from, Phantom {}); - - if (present.size() == closure.size()) return; - - auto sorted = destStore.topoSortPaths(closure); - - StorePathSet missing; - for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) - if (!present.count(*i)) missing.insert(*i); - - printMsg(lvlDebug, "sending %d missing paths", missing.size()); - - std::unique_lock sendLock(sendMutex, - std::chrono::seconds(600)); - - to << cmdImportPaths; - destStore.exportPaths(missing, to); - to.flush(); - - if (readInt(from) != 1) - throw Error("remote machine failed to import closure"); -} - - -// FIXME: use Store::topoSortPaths(). -StorePaths reverseTopoSortPaths(const std::map & paths) -{ - StorePaths sorted; - StorePathSet visited; - - std::function dfsVisit; - - dfsVisit = [&](const StorePath & path) { - if (!visited.insert(path).second) return; - - auto info = paths.find(path); - auto references = info == paths.end() ? StorePathSet() : info->second.references; - - for (auto & i : references) - /* Don't traverse into paths that don't exist. That can - happen due to substitutes for non-existent paths. */ - if (i != path && paths.count(i)) - dfsVisit(i); - - sorted.push_back(path); - }; - - for (auto & i : paths) - dfsVisit(i.first); - - return sorted; -} - - void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, @@ -186,24 +30,41 @@ void State::buildRemote(ref destStore, std::string base(step->drvPath.to_string()); result.logFile = logDir + "/" + std::string(base, 0, 2) + "/" + std::string(base, 2); - AutoDelete autoDelete(result.logFile, false); createDirs(dirOf(result.logFile)); - AutoCloseFD logFD = open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); - if (!logFD) throw SysError("creating log file ‘%s’", result.logFile); - - nix::Path tmpDir = createTempDir(); - AutoDelete tmpDirDel(tmpDir, true); - try { - updateStep(ssConnecting); + updateStep(ssBuilding); + result.startTime = time(0); - // FIXME: rewrite to use Store. - Child child; - openConnection(machine, tmpDir, logFD.get(), child); + auto buildStoreUrl = machineToStoreUrl(machine); + Strings args = { + localStore->printStorePath(step->drvPath), + "--store", destStore->getUri(), + "--eval-store", localStore->getUri(), + "--build-store", buildStoreUrl, + "--max-silent-time", std::to_string(maxSilentTime), + "--timeout", std::to_string(buildTimeout), + "--max-build-log-size", std::to_string(maxLogSize), + "--max-output-size", std::to_string(maxOutputSize), + "--repeat", std::to_string(repeats), + "--log-file", result.logFile, + // FIXME: step->isDeterministic + }; + + // FIXME: set pid for cancellation + + auto [status, stdout] = [&]() { + MaintainCount mc(nrStepsBuilding); + return runProgram({ + .program = "hydra-build-step", + .args = std::move(args), + }); + }(); + + #if 0 { auto activeStepState(activeStep->state_.lock()); if (activeStepState->cancelled) throw Error("step cancelled"); @@ -221,155 +82,36 @@ void State::buildRemote(ref destStore, possibility that we end up killing another process. Meh. */ }); + #endif - FdSource from(child.from.get()); - FdSink to(child.to.get()); + result.stopTime = time(0); - Finally updateStats([&]() { - bytesReceived += from.read; - bytesSent += to.written; - }); - - /* Handshake. */ - unsigned int remoteVersion; - - try { - to << SERVE_MAGIC_1 << 0x204; - to.flush(); - - unsigned int magic = readInt(from); - if (magic != SERVE_MAGIC_2) - throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", machine->sshName); - remoteVersion = readInt(from); - if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200) - throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", machine->sshName); - if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0) - throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName); - - } catch (EndOfFile & e) { - child.pid.wait(); - std::string s = chomp(readFile(result.logFile)); - throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s); - } + if (!statusOk(status)) + throw ExecError(status, fmt("hydra-build-step %s with output:\n%s", statusToString(status), stdout)); + /* The build was executed successfully, so clear the failure + count for this machine. */ { auto info(machine->state->connectInfo.lock()); info->consecutiveFailures = 0; } - /* Gather the inputs. If the remote side is Nix <= 1.9, we have to - copy the entire closure of ‘drvPath’, as well as the required - outputs of the input derivations. On Nix > 1.9, we only need to - copy the immediate sources of the derivation and the required - outputs of the input derivations. */ - updateStep(ssSendingInputs); + /* Read the BuildResult from the child. */ + StringSource source(stdout); + result.overhead += readNum(source); + auto totalNarSize = readNum(source); + auto buildResult = worker_proto::read(*localStore, source, Phantom {}); - StorePathSet inputs; - BasicDerivation basicDrv(*step->drv); - - for (auto & p : step->drv->inputSrcs) - inputs.insert(p); - - for (auto & input : step->drv->inputDrvs) { - auto drv2 = localStore->readDerivation(input.first); - for (auto & name : input.second) { - if (auto i = get(drv2.outputs, name)) { - auto outPath = i->path(*localStore, drv2.name, name); - inputs.insert(*outPath); - basicDrv.inputSrcs.insert(*outPath); - } - } + // FIXME: make RemoteResult inherit BuildResult. + result.errorMsg = buildResult.errorMsg; + result.timesBuilt = buildResult.timesBuilt; + result.isNonDeterministic = buildResult.isNonDeterministic; + if (buildResult.startTime && buildResult.stopTime) { + result.startTime = buildResult.startTime; + result.stopTime = buildResult.stopTime; } - /* Ensure that the inputs exist in the destination store. This is - a no-op for regular stores, but for the binary cache store, - this will copy the inputs to the binary cache from the local - store. */ - if (localStore != std::shared_ptr(destStore)) { - copyClosure(*localStore, *destStore, - step->drv->inputSrcs, - NoRepair, NoCheckSigs, NoSubstitute); - } - - { - auto mc1 = std::make_shared>(nrStepsWaiting); - mc1.reset(); - MaintainCount mc2(nrStepsCopyingTo); - - printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", - localStore->printStorePath(step->drvPath), machine->sshName); - - auto now1 = std::chrono::steady_clock::now(); - - /* Copy the input closure. */ - if (machine->isLocalhost()) { - StorePathSet closure; - destStore->computeFSClosure(inputs, closure); - copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } else { - copyClosureTo(machine->state->sendLock, *destStore, from, to, inputs, true); - } - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } - - autoDelete.cancel(); - - /* Truncate the log to get rid of messages about substitutions - etc. on the remote system. */ - if (lseek(logFD.get(), SEEK_SET, 0) != 0) - throw SysError("seeking to the start of log file ‘%s’", result.logFile); - - if (ftruncate(logFD.get(), 0) == -1) - throw SysError("truncating log file ‘%s’", result.logFile); - - logFD = -1; - - /* Do the build. */ - printMsg(lvlDebug, "building ‘%s’ on ‘%s’", - localStore->printStorePath(step->drvPath), - machine->sshName); - - updateStep(ssBuilding); - - to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); - writeDerivation(to, *localStore, basicDrv); - to << maxSilentTime << buildTimeout; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) - to << maxLogSize; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - to << repeats // == build-repeat - << step->isDeterministic; // == enforce-determinism - } - to.flush(); - - result.startTime = time(0); - int res; - { - MaintainCount mc(nrStepsBuilding); - res = readInt(from); - } - result.stopTime = time(0); - - result.errorMsg = readString(from); - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - result.timesBuilt = readInt(from); - result.isNonDeterministic = readInt(from); - auto start = readInt(from); - auto stop = readInt(from); - if (start && start) { - /* Note: this represents the duration of a single - round, rather than all rounds. */ - result.startTime = start; - result.stopTime = stop; - } - } - if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) { - worker_proto::read(*localStore, from, Phantom {}); - } - switch ((BuildResult::Status) res) { + switch (buildResult.status) { case BuildResult::Built: result.stepStatus = bsSuccess; break; @@ -413,10 +155,18 @@ void State::buildRemote(ref destStore, result.stepStatus = bsAborted; break; } + if (result.stepStatus != bsSuccess) return; result.errorMsg = ""; + /* If the NAR size limit was exceeded, then hydra-build-step + will not have copied the output paths. */ + if (totalNarSize > maxOutputSize) { + result.stepStatus = bsNarSizeLimitExceeded; + return; + } + /* If the path was substituted or already valid, then we didn't get a build log. */ if (result.isCached) { @@ -426,93 +176,6 @@ void State::buildRemote(ref destStore, result.logFile = ""; } - /* Copy the output paths. */ - if (!machine->isLocalhost() || localStore != std::shared_ptr(destStore)) { - updateStep(ssReceivingOutputs); - - MaintainCount mc(nrStepsCopyingFrom); - - auto now1 = std::chrono::steady_clock::now(); - - StorePathSet outputs; - for (auto & i : step->drv->outputsAndOptPaths(*localStore)) { - if (i.second.second) - outputs.insert(*i.second.second); - } - - /* Get info about each output path. */ - std::map infos; - size_t totalNarSize = 0; - to << cmdQueryPathInfos; - worker_proto::write(*localStore, to, outputs); - to.flush(); - while (true) { - auto storePathS = readString(from); - if (storePathS == "") break; - auto deriver = readString(from); // deriver - auto references = worker_proto::read(*localStore, from, Phantom {}); - readLongLong(from); // download size - auto narSize = readLongLong(from); - auto narHash = Hash::parseAny(readString(from), htSHA256); - auto ca = parseContentAddressOpt(readString(from)); - readStrings(from); // sigs - ValidPathInfo info(localStore->parseStorePath(storePathS), narHash); - assert(outputs.count(info.path)); - info.references = references; - info.narSize = narSize; - totalNarSize += info.narSize; - info.narHash = narHash; - info.ca = ca; - if (deriver != "") - info.deriver = localStore->parseStorePath(deriver); - infos.insert_or_assign(info.path, info); - } - - if (totalNarSize > maxOutputSize) { - result.stepStatus = bsNarSizeLimitExceeded; - return; - } - - /* Copy each path. */ - printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)", - localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); - - auto pathsSorted = reverseTopoSortPaths(infos); - - for (auto & path : pathsSorted) { - auto & info = infos.find(path)->second; - - /* Receive the NAR from the remote and add it to the - destination store. Meanwhile, extract all the info from the - NAR that getBuildOutput() needs. */ - auto source2 = sinkToSource([&](Sink & sink) - { - /* Note: we should only send the command to dump the store - path to the remote if the NAR is actually going to get read - by the destination store, which won't happen if this path - is already valid on the destination store. Since this - lambda function only gets executed if someone tries to read - from source2, we will send the command from here rather - than outside the lambda. */ - to << cmdDumpStorePath << localStore->printStorePath(path); - to.flush(); - - TeeSource tee(from, sink); - extractNarData(tee, localStore->printStorePath(path), narMembers); - }); - - destStore->addToStore(info, *source2, NoRepair, NoCheckSigs); - } - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } - - /* Shut down the connection. */ - child.to = -1; - child.pid.wait(); - } catch (Error & e) { /* Disable this machine until a certain period of time has passed. This period increases on every consecutive diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 723bf223..5e341edb 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -569,12 +569,11 @@ void State::dumpStatus(Connection & conn) } root.attr("nrActiveSteps", activeSteps_.lock()->size()); root.attr("nrStepsBuilding", nrStepsBuilding); + #if 0 root.attr("nrStepsCopyingTo", nrStepsCopyingTo); root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); - root.attr("nrStepsWaiting", nrStepsWaiting); + #endif root.attr("nrUnsupportedSteps", nrUnsupportedSteps); - root.attr("bytesSent", bytesSent); - root.attr("bytesReceived", bytesReceived); root.attr("nrBuildsRead", nrBuildsRead); root.attr("buildReadTimeMs", buildReadTimeMs); root.attr("buildReadTimeAvgMs", nrBuildsRead == 0 ? 0.0 : (float) buildReadTimeMs / nrBuildsRead); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 47e74f55..651aaf97 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -359,9 +359,10 @@ private: counter nrStepsStarted{0}; counter nrStepsDone{0}; counter nrStepsBuilding{0}; + #if 0 counter nrStepsCopyingTo{0}; counter nrStepsCopyingFrom{0}; - counter nrStepsWaiting{0}; + #endif counter nrUnsupportedSteps{0}; counter nrRetries{0}; counter maxNrRetries{0}; @@ -370,8 +371,6 @@ private: counter nrQueueWakeups{0}; counter nrDispatcherWakeups{0}; counter dispatchTimeMs{0}; - counter bytesSent{0}; - counter bytesReceived{0}; counter nrActiveDbUpdates{0}; /* Specific build to do for --build-one (testing only). */