Compare commits

..

2 Commits

Author SHA1 Message Date
Jörg Thalheim
1c43f45bab Merge pull request #1459 from dermetfan/fix-1429-backport-nix-2.25
(backport to nix-2.25) hydra-eval-jobset: do not wait on n-e-j inside transaction
2025-03-29 15:28:56 +01:00
Robin Stumm
6d92b79d7a hydra-eval-jobset: do not wait on n-e-j inside transaction
backport of #TODO-THE-PR-NUMBER
2025-03-26 20:30:33 +01:00
8 changed files with 285 additions and 95 deletions

43
flake.lock generated
View File

@@ -1,10 +1,30 @@
{
"nodes": {
"libgit2": {
"flake": false,
"locked": {
"lastModified": 1715853528,
"narHash": "sha256-J2rCxTecyLbbDdsyBWn9w7r3pbKRMkI9E7RvRgAqBdY=",
"owner": "libgit2",
"repo": "libgit2",
"rev": "36f7e21ad757a3dacc58cf7944329da6bc1d6e96",
"type": "github"
},
"original": {
"owner": "libgit2",
"ref": "v1.8.1",
"repo": "libgit2",
"type": "github"
}
},
"nix": {
"inputs": {
"flake-compat": [],
"flake-parts": [],
"git-hooks-nix": [],
"libgit2": [
"libgit2"
],
"nixpkgs": [
"nixpkgs"
],
@@ -12,16 +32,16 @@
"nixpkgs-regression": []
},
"locked": {
"lastModified": 1739899400,
"narHash": "sha256-q/RgA4bB7zWai4oPySq9mch7qH14IEeom2P64SXdqHs=",
"lastModified": 1739390454,
"narHash": "sha256-mIpJgIwPS4o4xYhN1B+/fHESEXoxpu6nVoZTzZ0MfTg=",
"owner": "NixOS",
"repo": "nix",
"rev": "e310c19a1aeb1ce1ed4d41d5ab2d02db596e0918",
"rev": "d652513e4519ed4eb48c92f8670e5a71c7793fc3",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "2.26-maintenance",
"ref": "2.25-maintenance",
"repo": "nix",
"type": "github"
}
@@ -29,11 +49,11 @@
"nix-eval-jobs": {
"flake": false,
"locked": {
"lastModified": 1739500569,
"narHash": "sha256-3wIReAqdTALv39gkWXLMZQvHyBOc3yPkWT2ZsItxedY=",
"lastModified": 1739426028,
"narHash": "sha256-1dZLPw+nlFQzzswfyTxW+8VF1AJ4ZvoYvLTjlHiz1SA=",
"owner": "nix-community",
"repo": "nix-eval-jobs",
"rev": "4b392b284877d203ae262e16af269f702df036bc",
"rev": "6d4fd5a93d7bc953ffa4dcd6d53ad7056a71eff7",
"type": "github"
},
"original": {
@@ -44,22 +64,23 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1739461644,
"narHash": "sha256-1o1qR0KYozYGRrnqytSpAhVBYLNBHX+Lv6I39zGRzKM=",
"lastModified": 1726688310,
"narHash": "sha256-Xc9lEtentPCEtxc/F1e6jIZsd4MPDYv4Kugl9WtXlz0=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "97a719c9f0a07923c957cf51b20b329f9fb9d43f",
"rev": "dbebdd67a6006bb145d98c8debf9140ac7e651d0",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-24.11-small",
"ref": "nixos-24.05-small",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"libgit2": "libgit2",
"nix": "nix",
"nix-eval-jobs": "nix-eval-jobs",
"nixpkgs": "nixpkgs"

View File

@@ -1,11 +1,17 @@
{
description = "A Nix-based continuous build system";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11-small";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.05-small";
inputs.libgit2 = {
url = "github:libgit2/libgit2/v1.8.1";
flake = false;
};
inputs.nix = {
url = "github:NixOS/nix/2.26-maintenance";
url = "github:NixOS/nix/2.25-maintenance";
inputs.nixpkgs.follows = "nixpkgs";
inputs.libgit2.follows = "libgit2";
# hide nix dev tooling from our lock file
inputs.flake-parts.follows = "";
@@ -81,12 +87,7 @@
inherit (nixpkgs.lib) fileset;
inherit (self.packages.${system}) nix-eval-jobs;
rawSrc = self;
inherit (nix.packages.${system})
nix-util
nix-store
nix-main
nix-cli
;
nix = nix.packages.${system}.nix;
nix-perl-bindings = nix.hydraJobs.perlBindings.${system};
};
default = self.packages.${system}.hydra;

View File

@@ -8,22 +8,22 @@ project('hydra', 'cpp',
],
)
nix_util_dep = dependency('nix-util', required: true)
nix_store_dep = dependency('nix-store', required: true)
nix_main_dep = dependency('nix-main', required: true)
nix_expr_dep = dependency('nix-expr', required: true)
nix_flake_dep = dependency('nix-flake', required: true)
nix_cmd_dep = dependency('nix-cmd', required: true)
# Nix need extra flags not provided in its pkg-config files.
nix_dep = declare_dependency(
dependencies: [
nix_util_dep,
nix_store_dep,
nix_main_dep,
nix_expr_dep,
nix_flake_dep,
nix_cmd_dep,
],
compile_args: [
'-include', 'nix/config-util.hh',
'-include', 'nix/config-store.hh',
'-include', 'nix/config-main.hh',
],
compile_args: ['-include', 'nix/config.h'],
)
pqxx_dep = dependency('libpqxx', required: true)

View File

@@ -8,10 +8,7 @@
, perlPackages
, nix-util
, nix-store
, nix-main
, nix-cli
, nix
, nix-perl-bindings
, git
@@ -165,7 +162,7 @@ stdenv.mkDerivation (finalAttrs: {
nukeReferences
pkg-config
mdbook
nix-cli
nix
perlDeps
perl
unzip
@@ -175,9 +172,7 @@ stdenv.mkDerivation (finalAttrs: {
libpqxx
openssl
libxslt
nix-util
nix-store
nix-main
nix
perlDeps
perl
boost
@@ -204,14 +199,13 @@ stdenv.mkDerivation (finalAttrs: {
glibcLocales
libressl.nc
python3
nix-cli
];
hydraPath = lib.makeBinPath (
[
subversion
openssh
nix-cli
nix
coreutils
findutils
pixz
@@ -272,7 +266,7 @@ stdenv.mkDerivation (finalAttrs: {
--prefix PATH ':' $out/bin:$hydraPath \
--set HYDRA_RELEASE ${version} \
--set HYDRA_HOME $out/libexec/hydra \
--set NIX_RELEASE ${nix-cli.name or "unknown"} \
--set NIX_RELEASE ${nix.name or "unknown"} \
--set NIX_EVAL_JOBS_RELEASE ${nix-eval-jobs.name or "unknown"}
done
'';
@@ -280,5 +274,5 @@ stdenv.mkDerivation (finalAttrs: {
dontStrip = true;
meta.description = "Build of Hydra on ${stdenv.system}";
passthru = { inherit perlDeps; };
passthru = { inherit perlDeps nix; };
})

View File

@@ -9,10 +9,13 @@
#include "path.hh"
#include "legacy-ssh-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh"
#include "finally.hh"
#include "url.hh"
@@ -36,6 +39,108 @@ bool ::Machine::isLocalhost() const
namespace nix::build_remote {
static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
} else {
auto remoteStore = machine->storeUri.params.find("remote-store");
if (remoteStore != machine->storeUri.params.end()) {
command.push_back("--store");
command.push_back(shellEscape(remoteStore->second));
}
}
auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
});
// XXX: determine the actual max value we can use from /proc.
// FIXME: Should this be upstreamed into `startCommand` in Nix?
int pipesize = 1024 * 1024;
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
return ret;
}
static void copyClosureTo(
::Machine::Connection & conn,
Store & destStore,
const StorePathSet & paths,
SubstituteFlag useSubstitutes = NoSubstitute)
{
StorePathSet closure;
destStore.computeFSClosure(paths, closure);
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote
host. */
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);
if (present.size() == closure.size()) return;
auto sorted = destStore.topoSortPaths(closure);
StorePathSet missing;
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
if (!present.count(*i)) missing.insert(*i);
printMsg(lvlDebug, "sending %d missing paths", missing.size());
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600));
conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();
if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
}
// FIXME: use Store::topoSortPaths().
static StorePaths reverseTopoSortPaths(const std::map<StorePath, UnkeyedValidPathInfo> & paths)
{
StorePaths sorted;
StorePathSet visited;
std::function<void(const StorePath & path)> dfsVisit;
dfsVisit = [&](const StorePath & path) {
if (!visited.insert(path).second) return;
auto info = paths.find(path);
auto references = info == paths.end() ? StorePathSet() : info->second.references;
for (auto & i : references)
/* Don't traverse into paths that don't exist. That can
happen due to substitutes for non-existent paths. */
if (i != path && paths.count(i))
dfsVisit(i);
sorted.push_back(path);
};
for (auto & i : paths)
dfsVisit(i.first);
return sorted;
}
static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const StorePath & drvPath)
{
std::string base(drvPath.to_string());
@@ -98,13 +203,13 @@ static BasicDerivation sendInputs(
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
copyClosure(
destStore,
conn.machine->isLocalhost() ? localStore : *conn.store,
basicDrv.inputSrcs,
NoRepair,
NoCheckSigs,
Substitute);
if (conn.machine->isLocalhost()) {
StorePathSet closure;
destStore.computeFSClosure(basicDrv.inputSrcs, closure);
copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(conn, destStore, basicDrv.inputSrcs, Substitute);
}
auto now2 = std::chrono::steady_clock::now();
@@ -123,7 +228,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding
)
{
auto kont = conn.store->buildDerivationAsync(drvPath, drv, options);
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
BuildResult result;
@@ -132,10 +237,7 @@ static BuildResult performBuild(
startTime = time(0);
{
MaintainCount<counter> mc(nrStepsBuilding);
result = kont();
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
}
stopTime = time(0);
@@ -151,7 +253,7 @@ static BuildResult performBuild(
// If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
{
// If the remote is too old to handle CA derivations, we cant get this
// far anyways
@@ -176,6 +278,55 @@ static BuildResult performBuild(
return result;
}
static void copyPathFromRemote(
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
conn.to.flush();
TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
}
static void copyPathsFromRemote(
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const std::map<StorePath, UnkeyedValidPathInfo> & infos
)
{
auto pathsSorted = reverseTopoSortPaths(infos);
for (auto & path : pathsSorted) {
auto & info = infos.find(path)->second;
copyPathFromRemote(
conn, narMembers, localStore, destStore,
ValidPathInfo { path, info });
}
}
}
/* using namespace nix::build_remote; */
@@ -253,39 +404,30 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssConnecting);
// FIXME: rewrite to use Store.
::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
auto storeRef = machine->completeStoreReference();
auto remoteStore = machine->openStore().dynamic_pointer_cast<LegacySSHStore>();
assert(remoteStore);
auto * pSpecified = std::get_if<StoreReference::Specified>(&storeRef.variant);
if (!pSpecified || pSpecified->scheme != "ssh") {
throw Error("Currently, only (legacy-)ssh stores are supported!");
}
remoteStore->connPipeSize = 1024 * 1024;
if (machine->isLocalhost()) {
auto rp_new = remoteStore->remoteProgram.get();
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStore->remoteProgram).assign(rp_new);
}
remoteStore->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
return nix::ref{remoteStore};
}(),
LegacySSHStoreConfig storeConfig {
pSpecified->scheme,
pSpecified->authority,
storeRef.params
};
auto master = storeConfig.createSSHMaster(
false, // no SSH master yet
logFD.get());
// FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master);
{
auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = conn.store->getConnectionPid();
activeStepState->pid = child->sshPid;
}
Finally clearPid([&]() {
@@ -300,12 +442,35 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});
::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};
Finally updateStats([&]() {
auto stats = conn.store->getConnectionStats();
bytesReceived += stats.bytesReceived;
bytesSent += stats.bytesSent;
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
});
constexpr ServeProto::Version our_version = 0x206;
try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->storeUri.render());
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->storeUri.render(), s);
}
{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
@@ -374,12 +539,21 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now();
auto infos = conn.queryPathInfos(*localStore, outputs);
size_t totalNarSize = 0;
for (auto & [_, info] : infos) totalNarSize += info.narSize;
if (totalNarSize > maxOutputSize) {
result.stepStatus = bsNarSizeLimitExceeded;
return;
}
/* Copy each path. */
printMsg(lvlDebug, "copying outputs of %s from %s",
localStore->printStorePath(step->drvPath), machine->storeUri.render());
copyClosure(*conn.store, *destStore, outputs);
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->storeUri.render(), totalNarSize);
build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
@@ -400,11 +574,9 @@ void State::buildRemote(ref<Store> destStore,
}
}
/* Shut down the connection done by RAII.
Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/
/* Shut down the connection. */
child->in = -1;
child->sshPid.wait();
} catch (Error & e) {
/* Disable this machine until a certain period of time has

View File

@@ -20,7 +20,9 @@
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
#include "legacy-ssh-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "machines.hh"
@@ -290,11 +292,9 @@ struct Machine : nix::Machine
bool isLocalhost() const;
// A connection to a machine
struct Connection {
struct Connection : nix::ServeProto::BasicClientConnection {
// Backpointer to the machine
ptr machine;
// Opened store
nix::ref<nix::LegacySSHStore> store;
};
};

View File

@@ -773,6 +773,9 @@ sub checkJobsetWrapped {
my $jobsetChanged = 0;
my %buildMap;
my @jobs;
push @jobs, $_ while defined($_ = $jobsIter->());
$db->txn_do(sub {
my $prevEval = getPrevJobsetEval($db, $jobset, 1);
@@ -796,7 +799,7 @@ sub checkJobsetWrapped {
my @jobsWithConstituents;
while (defined(my $job = $jobsIter->())) {
foreach my $job (@jobs) {
if ($jobsetsJobset) {
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless $job->{attr} eq "jobsets";

View File

@@ -27,7 +27,6 @@ testenv.prepend('PERL5LIB',
separator: ':'
)
testenv.prepend('PATH',
fs.parent(find_program('nix').full_path()),
fs.parent(hydra_evaluator.full_path()),
fs.parent(hydra_queue_runner.full_path()),
meson.project_source_root() / 'src/script',