Compare commits

..

5 Commits

Author SHA1 Message Date
John Ericson
f3dc4e9228 WIP ssh-ng:// 2025-02-14 18:51:49 -05:00
John Ericson
45d075e5db WIP: Avoid custom logic copying outputs from the remote builder
We need a replacement for the nar member logic, however. And maybe also
a test that fails until this is fixed (this one should not be passing).
2025-02-14 18:50:38 -05:00
John Ericson
9e162dcf52 Avoid custom logic to copy inputs to the remote builder 2025-02-14 18:50:38 -05:00
John Ericson
4c173daec7 Use LegacySSHStore
In https://github.com/NixOS/nix/pull/10748 it is extended with
everything we need.
2025-02-14 18:48:46 -05:00
John Ericson
8675aee25b WIP TEMP nix update, don't mere this!
Not until https://github.com/NixOS/nix/pull/10748 lands

Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/970942f45836172fda410a638853382952189eb9?narHash=sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0%3D' (2025-02-12)
  → 'github:NixOS/nix/5eade4825221d3284fc6555cb20de2c7aa171d72?narHash=sha256-n5kdS1C24tlJxDV6Wm1iBlyvGk%2Bp0gMXRcWVCAipYLs%3D' (2025-02-14)

• Updated input 'nix-eval-jobs':
    'github:Ericson2314/nix-eval-jobs/5e27c2724a4b07862e7ff1a198aa2ed68dea3e2c?narHash=sha256-7xgSdKnQW11eWd59MnpUNS%2BgwgtOJH2ShzLwByev3rg%3D' (2025-02-14)
  → 'github:Ericson2314/nix-eval-jobs/de345eb4518d952c2d86261b270f2c31edecd3de?narHash=sha256-dNMJY6%2BG3PwE8lIAhwetPJdA2DxCEKRXPY/EtHmdDh4%3D' (2025-02-14)
2025-02-14 18:15:22 -05:00
6 changed files with 81 additions and 260 deletions

20
flake.lock generated
View File

@@ -5,6 +5,7 @@
"flake-compat": [],
"flake-parts": [],
"git-hooks-nix": [],
"nixfmt": [],
"nixpkgs": [
"nixpkgs"
],
@@ -12,16 +13,16 @@
"nixpkgs-regression": []
},
"locked": {
"lastModified": 1739393420,
"narHash": "sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0=",
"lastModified": 1739571938,
"narHash": "sha256-NlaLAed/xei6RWpU2HIIbDjILRC4l1NIfGeyrn7ALQs=",
"owner": "NixOS",
"repo": "nix",
"rev": "970942f45836172fda410a638853382952189eb9",
"rev": "ffc649d2eabdd3e678b5bcc211dd59fd06debf3e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "2.26-maintenance",
"ref": "ssh-ng-extensions-for-hydra",
"repo": "nix",
"type": "github"
}
@@ -29,15 +30,16 @@
"nix-eval-jobs": {
"flake": false,
"locked": {
"lastModified": 1739500569,
"narHash": "sha256-3wIReAqdTALv39gkWXLMZQvHyBOc3yPkWT2ZsItxedY=",
"owner": "nix-community",
"lastModified": 1739499741,
"narHash": "sha256-dNMJY6+G3PwE8lIAhwetPJdA2DxCEKRXPY/EtHmdDh4=",
"owner": "Ericson2314",
"repo": "nix-eval-jobs",
"rev": "4b392b284877d203ae262e16af269f702df036bc",
"rev": "de345eb4518d952c2d86261b270f2c31edecd3de",
"type": "github"
},
"original": {
"owner": "nix-community",
"owner": "Ericson2314",
"ref": "nix-2.27",
"repo": "nix-eval-jobs",
"type": "github"
}

View File

@@ -4,7 +4,7 @@
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11-small";
inputs.nix = {
url = "github:NixOS/nix/2.26-maintenance";
url = "github:NixOS/nix/ssh-ng-extensions-for-hydra";
inputs.nixpkgs.follows = "nixpkgs";
# hide nix dev tooling from our lock file
@@ -13,10 +13,11 @@
inputs.nixpkgs-regression.follows = "";
inputs.nixpkgs-23-11.follows = "";
inputs.flake-compat.follows = "";
inputs.nixfmt.follows = "";
};
inputs.nix-eval-jobs = {
url = "github:nix-community/nix-eval-jobs";
url = "github:Ericson2314/nix-eval-jobs/nix-2.27";
# We want to control the deps precisely
flake = false;
};

View File

@@ -7,15 +7,12 @@
#include "build-result.hh"
#include "path.hh"
#include "legacy-ssh-store.hh"
#include "ssh-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh"
#include "finally.hh"
#include "url.hh"
@@ -39,108 +36,6 @@ bool ::Machine::isLocalhost() const
namespace nix::build_remote {
static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
} else {
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
}
auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
});
// XXX: determine the actual max value we can use from /proc.
// FIXME: Should this be upstreamed into `startCommand` in Nix?
int pipesize = 1024 * 1024;
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
return ret;
}
static void copyClosureTo(
::Machine::Connection & conn,
Store & destStore,
const StorePathSet & paths,
SubstituteFlag useSubstitutes = NoSubstitute)
{
StorePathSet closure;
destStore.computeFSClosure(paths, closure);
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote
host. */
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);
if (present.size() == closure.size()) return;
auto sorted = destStore.topoSortPaths(closure);
StorePathSet missing;
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
if (!present.count(*i)) missing.insert(*i);
printMsg(lvlDebug, "sending %d missing paths", missing.size());
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600));
conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();
if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
}
// FIXME: use Store::topoSortPaths().
static StorePaths reverseTopoSortPaths(const std::map<StorePath, UnkeyedValidPathInfo> & paths)
{
StorePaths sorted;
StorePathSet visited;
std::function<void(const StorePath & path)> dfsVisit;
dfsVisit = [&](const StorePath & path) {
if (!visited.insert(path).second) return;
auto info = paths.find(path);
auto references = info == paths.end() ? StorePathSet() : info->second.references;
for (auto & i : references)
/* Don't traverse into paths that don't exist. That can
happen due to substitutes for non-existent paths. */
if (i != path && paths.count(i))
dfsVisit(i);
sorted.push_back(path);
};
for (auto & i : paths)
dfsVisit(i.first);
return sorted;
}
static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const StorePath & drvPath)
{
std::string base(drvPath.to_string());
@@ -203,13 +98,13 @@ static BasicDerivation sendInputs(
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
if (conn.machine->isLocalhost()) {
StorePathSet closure;
destStore.computeFSClosure(basicDrv.inputSrcs, closure);
copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(conn, destStore, basicDrv.inputSrcs, Substitute);
}
copyClosure(
destStore,
conn.machine->isLocalhost() ? localStore : *conn.store,
basicDrv.inputSrcs,
NoRepair,
NoCheckSigs,
Substitute);
auto now2 = std::chrono::steady_clock::now();
@@ -224,11 +119,10 @@ static BuildResult performBuild(
Store & localStore,
StorePath drvPath,
const BasicDerivation & drv,
const ServeProto::BuildOptions & options,
counter & nrStepsBuilding
)
{
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
auto kont = conn.store->buildDerivationAsync(drvPath, drv, bmNormal);
BuildResult result;
@@ -237,7 +131,10 @@ static BuildResult performBuild(
startTime = time(0);
{
MaintainCount<counter> mc(nrStepsBuilding);
result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
result = kont();
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
}
stopTime = time(0);
@@ -253,7 +150,7 @@ static BuildResult performBuild(
// If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
{
// If the remote is too old to handle CA derivations, we cant get this
// far anyways
@@ -278,55 +175,6 @@ static BuildResult performBuild(
return result;
}
static void copyPathFromRemote(
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
conn.to.flush();
TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
}
static void copyPathsFromRemote(
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const std::map<StorePath, UnkeyedValidPathInfo> & infos
)
{
auto pathsSorted = reverseTopoSortPaths(infos);
for (auto & path : pathsSorted) {
auto & info = infos.find(path)->second;
copyPathFromRemote(
conn, narMembers, localStore, destStore,
ValidPathInfo { path, info });
}
}
}
/* using namespace nix::build_remote; */
@@ -389,7 +237,6 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
void State::buildRemote(ref<Store> destStore,
::Machine::ptr machine, Step::ptr step,
const ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers)
@@ -404,35 +251,43 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssConnecting);
auto storeRef = machine->completeStoreReference();
auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
LegacySSHStoreConfig storeConfig {
pSpecified->scheme,
pSpecified->authority,
storeRef.params
};
auto master = storeConfig.createSSHMaster(
false, // no SSH master yet
logFD.get());
// FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master);
::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh-ng") {
throw Error("Currently, only ssh-ng:// stores are supported!");
}
auto remoteStore = machine->openStore().dynamic_pointer_cast<RemoteStore>();
auto remoteStoreConfig = std::dynamic_pointer_cast<SSHStoreConfig>(remoteStore);
assert(remoteStore);
if (machine->isLocalhost()) {
auto rp_new = remoteStoreConfig->remoteProgram.get();
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStoreConfig->remoteProgram).assign(rp_new);
}
remoteStoreConfig->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
// TODO logging
//const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
return nix::ref{remoteStore};
}(),
};
{
auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = child->sshPid;
}
Finally clearPid([&]() {
auto activeStepState(activeStep->state_.lock());
activeStepState->pid = -1;
/* FIXME: there is a slight race here with step
cancellation in State::processQueueChange(), which
@@ -442,35 +297,13 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});
::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};
Finally updateStats([&]() {
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
// TODO
//auto stats = conn.store->getConnectionStats();
//bytesReceived += stats.bytesReceived;
//bytesSent += stats.bytesSent;
});
constexpr ServeProto::Version our_version = 0x206;
try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->storeUri.render(), s);
}
{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
@@ -508,7 +341,6 @@ void State::buildRemote(ref<Store> destStore,
*localStore,
step->drvPath,
resolvedDrv,
buildOptions,
nrStepsBuilding
);
@@ -539,21 +371,12 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now();
auto infos = conn.queryPathInfos(*localStore, outputs);
size_t totalNarSize = 0;
for (auto & [_, info] : infos) totalNarSize += info.narSize;
if (totalNarSize > maxOutputSize) {
result.stepStatus = bsNarSizeLimitExceeded;
return;
}
/* Copy each path. */
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize);
printMsg(lvlDebug, "copying outputs of %s from %s",
localStore->printStorePath(step->drvPath), machine->storeUri.render());
copyClosure(*conn.store, *destStore, outputs);
build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
@@ -574,9 +397,11 @@ void State::buildRemote(ref<Store> destStore,
}
}
/* Shut down the connection. */
child->in = -1;
child->sshPid.wait();
/* Shut down the connection done by RAII.
Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/
} catch (Error & e) {
/* Disable this machine until a certain period of time has

View File

@@ -98,13 +98,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
it). */
BuildID buildId;
std::optional<StorePath> buildDrvPath;
// Other fields set below
nix::ServeProto::BuildOptions buildOptions {
.maxLogSize = maxLogSize,
.nrRepeats = step->isDeterministic ? 1u : 0u,
.enforceDeterminism = step->isDeterministic,
.keepFailed = false,
};
auto conn(dbPool.get());
@@ -139,18 +132,19 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
if (i != jobsetRepeats.end())
buildOptions.nrRepeats = std::max(buildOptions.nrRepeats, i->second);
warn("jobset repeats is deprecated; nix stopped supporting this correctly a long time ago.");
}
}
if (!build) build = *dependents.begin();
buildId = build->id;
buildDrvPath = build->drvPath;
buildOptions.maxSilentTime = build->maxSilentTime;
buildOptions.buildTimeout = build->buildTimeout;
settings.maxLogSize = maxLogSize;
settings.maxSilentTime = build->maxSilentTime;
settings.buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
localStore->printStorePath(step->drvPath), 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
}
if (!buildOneDone)
@@ -211,7 +205,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
buildRemote(destStore, machine, step, result, activeStep, updateStep, narMembers);
} catch (Error & e) {
if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);

View File

@@ -182,7 +182,7 @@ void State::monitorMachinesFile()
getEnv("NIX_REMOTE_SYSTEMS").value_or(pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
if (machinesFiles.empty()) {
parseMachines("localhost " +
parseMachines("ssh-ng://localhost " +
(settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem.get())
+ " - " + std::to_string(settings.maxBuildJobs) + " 1 "
+ concatStringsSep(",", StoreConfig::getDefaultSystemFeatures()));

View File

@@ -20,9 +20,7 @@
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "ssh-store.hh"
#include "machines.hh"
@@ -292,14 +290,16 @@ struct Machine : nix::Machine
bool isLocalhost() const;
// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
struct Connection {
// Backpointer to the machine
ptr machine;
// Opened store
nix::ref<nix::RemoteStore> store;
};
};
class HydraConfig;
struct HydraConfig;
class State
@@ -542,7 +542,6 @@ private:
void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step,
const nix::ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers);