Compare commits

...

49 Commits

Author SHA1 Message Date
John Ericson
f3dc4e9228 WIP ssh-ng:// 2025-02-14 18:51:49 -05:00
John Ericson
45d075e5db WIP: Avoid custom logic copying outputs from the remote builder
We need a replacement for the nar member logic, however. And maybe also
a test that fails until this is fixed (this one should not be passing).
2025-02-14 18:50:38 -05:00
John Ericson
9e162dcf52 Avoid custom logic to copy inputs to the remote builder 2025-02-14 18:50:38 -05:00
John Ericson
4c173daec7 Use LegacySSHStore
In https://github.com/NixOS/nix/pull/10748 it is extended with
everything we need.
2025-02-14 18:48:46 -05:00
John Ericson
8675aee25b WIP TEMP nix update, don't mere this!
Not until https://github.com/NixOS/nix/pull/10748 lands

Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/970942f45836172fda410a638853382952189eb9?narHash=sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0%3D' (2025-02-12)
  → 'github:NixOS/nix/5eade4825221d3284fc6555cb20de2c7aa171d72?narHash=sha256-n5kdS1C24tlJxDV6Wm1iBlyvGk%2Bp0gMXRcWVCAipYLs%3D' (2025-02-14)

• Updated input 'nix-eval-jobs':
    'github:Ericson2314/nix-eval-jobs/5e27c2724a4b07862e7ff1a198aa2ed68dea3e2c?narHash=sha256-7xgSdKnQW11eWd59MnpUNS%2BgwgtOJH2ShzLwByev3rg%3D' (2025-02-14)
  → 'github:Ericson2314/nix-eval-jobs/de345eb4518d952c2d86261b270f2c31edecd3de?narHash=sha256-dNMJY6%2BG3PwE8lIAhwetPJdA2DxCEKRXPY/EtHmdDh4%3D' (2025-02-14)
2025-02-14 18:15:22 -05:00
John Ericson
51944a5fa5 Merge pull request #1443 from NixOS/nix-2.26
Nix 2.26
2025-02-13 22:13:32 -05:00
John Ericson
341b2f1309 Update build system to depend on Nix 2.26 2025-02-13 21:54:35 -05:00
John Ericson
4dc0f11379 Update flake.nix for Nix 2.26
Flake lock file updates:

• Removed input 'libgit2'
• Updated input 'nix':
    'github:NixOS/nix/d652513e4519ed4eb48c92f8670e5a71c7793fc3?narHash=sha256-mIpJgIwPS4o4xYhN1B%2B/fHESEXoxpu6nVoZTzZ0MfTg%3D' (2025-02-12)
  → 'github:NixOS/nix/970942f45836172fda410a638853382952189eb9?narHash=sha256-jGFuyYKJjJZsBRoi7ZcaVKt1OYxusz/ld1HA7VD2w/0%3D' (2025-02-12)
• Removed input 'nix/libgit2'
• Updated input 'nix-eval-jobs':
    'github:nix-community/nix-eval-jobs/6d4fd5a93d7bc953ffa4dcd6d53ad7056a71eff7?narHash=sha256-1dZLPw%2BnlFQzzswfyTxW%2B8VF1AJ4ZvoYvLTjlHiz1SA%3D' (2025-02-13)
  → 'github:nix-community/nix-eval-jobs/4b392b284877d203ae262e16af269f702df036bc?narHash=sha256-3wIReAqdTALv39gkWXLMZQvHyBOc3yPkWT2ZsItxedY%3D' (2025-02-14)
• Updated input 'nixpkgs':
    'github:NixOS/nixpkgs/dbebdd67a6006bb145d98c8debf9140ac7e651d0?narHash=sha256-Xc9lEtentPCEtxc/F1e6jIZsd4MPDYv4Kugl9WtXlz0%3D' (2024-09-18)
  → 'github:NixOS/nixpkgs/97a719c9f0a07923c957cf51b20b329f9fb9d43f?narHash=sha256-1o1qR0KYozYGRrnqytSpAhVBYLNBHX%2BLv6I39zGRzKM%3D' (2025-02-13)
2025-02-13 21:54:31 -05:00
John Ericson
ea09952b7e Merge pull request #1442 from NixOS/clean-up-flake-lockfile
Clean up flake lockfile stuff
2025-02-13 20:52:40 -05:00
John Ericson
81d21979ef Clean up flake lockfile stuff
The `flake = false;` for `nix-eval-jobs` didn't fully take before.

Flake lock file updates:

• Removed input 'nix-eval-jobs/flake-parts'
• Removed input 'nix-eval-jobs/flake-parts/nixpkgs-lib'
• Removed input 'nix-eval-jobs/nix-github-actions'
• Removed input 'nix-eval-jobs/nixpkgs'
• Removed input 'nix-eval-jobs/treefmt-nix'
• Removed input 'nix-eval-jobs/treefmt-nix/nixpkgs'
2025-02-13 20:23:08 -05:00
John Ericson
0ed9a82912 Merge pull request #1441 from NixOS/nix-2.25
Nix 2.25
2025-02-13 19:53:07 -05:00
John Ericson
80241fc8be Make code change necessary for building with Nix 2.25 2025-02-13 19:10:09 -05:00
John Ericson
4347833f45 Rework to synchronize deps
Bypass `nix-eval-job`'s flake, and just call-package it, for
fine-grained control.
2025-02-13 19:03:37 -05:00
John Ericson
8835cbd10f flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/a7fdef6858dd45b9d7bda7c92324c63faee7f509?narHash=sha256-XFznzb8L4SdUm9u%2Bw3DPpMWJhffuv%2B/6%2BaiVl00slns%3D' (2024-09-19)
  → 'github:NixOS/nix/d652513e4519ed4eb48c92f8670e5a71c7793fc3?narHash=sha256-mIpJgIwPS4o4xYhN1B%2B/fHESEXoxpu6nVoZTzZ0MfTg%3D' (2025-02-12)
• Updated input 'nix-eval-jobs':
    'github:nix-community/nix-eval-jobs/889ea1406736b53cf165b6c28398aae3969418d1?narHash=sha256-3wwtKpS5tUBdjaGeSia7CotonbiRB6K5Kp0dsUt3nzU%3D' (2024-12-10)
  → 'github:nix-community/nix-eval-jobs/6d4fd5a93d7bc953ffa4dcd6d53ad7056a71eff7?narHash=sha256-1dZLPw%2BnlFQzzswfyTxW%2B8VF1AJ4ZvoYvLTjlHiz1SA%3D' (2025-02-13)
2025-02-13 18:46:36 -05:00
John Ericson
9ad8ac586c Merge pull request #1440 from NixOS/legacy-ssh-expose-ssh-master
Use new `CommonSSHStoreConfig::createSSHMaster`
2025-02-13 18:30:41 -05:00
John Ericson
9a6928d93b Use new CommonSSHStoreConfig::createSSHMaster
This avoids some duplicated code, leveraging the same `StoreReference`
type that also undergirds the machine file dedup we just did prior.

By using `LegacySSHStoreConfig`, we're also taking a baby step towards
using the store interface rather than messing around with the protocol
internals.
2025-02-13 18:13:38 -05:00
John Ericson
810781a802 Merge pull request #1439 from NixOS/nix-next
Dedup machine file parsing, and other improvements
2025-02-13 18:10:08 -05:00
John Ericson
af9b0663f2 Merge branch 'master' into nix-next 2025-02-13 17:54:15 -05:00
Jörg Thalheim
c6f98202cd Merge pull request #1438 from NixOS/log-malformed-json
Log malformed JSON received from `nix-eval-jobs`
2025-02-12 12:58:18 +07:00
John Ericson
1dbc7f5845 Log malformed JSON received from nix-eval-jobs 2025-02-11 22:34:49 -05:00
John Ericson
c52845f560 Merge pull request #1421 from NixOS/nix-eval-jobs
Use `nix-eval-jobs` and delete `hydra-eval-jobs`
2025-02-07 19:41:38 -05:00
John Ericson
85383b9522 Render the nix-eval-jobs version too 2025-02-07 16:55:28 -05:00
Pierre Bourdon
2f92846e5a hydra-eval-jobs: remove, replaced by nix-eval-jobs
(cherry picked from commit ed7c58708cd3affd62a598a22a500ed2adf318bf)
2025-02-07 16:55:28 -05:00
Pierre Bourdon
d84ff32ce6 hydra-eval-jobset: Use nix-eval-jobs instead of hydra-eval-jobs
incrementally ingest eval results

nix-eval-jobs streams output, unlike hydra-eval-jobs. Now that we've
migrated, we can use this to:

1. Use less RAM by avoiding buffering a whole eval's worth of metadata
   into a Perl string and an array of JSON objects.
2. Make evals latency a bit lower by allowing the queue runner to start
   ingesting builds faster.

Also use the newly-restored constituents support in `nix-eval-jobs`

Note, we pass --workers and --max-memory-size to n-e-j

Lost in the h-e-j -> n-e-j migration, causing evaluation to always be
single threaded and limited to 4GiB RAM. Follow the config settings like
h-e-j used to do (via C++ code).

`nix-eval-jobs` should check `hydraJobs` and then `checks` with flakes

(cherry picked from commit 6d4ccff43c41adaf6e4b2b9bced7243bc2f6e97b)
(cherry picked from commit b0e9b4b2f99f9d8f5c4e780e89f955c394b5ced4)
(cherry picked from commit cdfc5c81e8037d3e4818a3e459d0804b2c157ea9)
(cherry picked from commit 4b107e6ff36bd89958fba36e0fe0340903e7cd13)

Co-Authored-By: Maximilian Bosch <maximilian@mbosch.me>
2025-02-07 16:55:28 -05:00
Pierre Bourdon
0c9726af59 flake: add nix-eval-jobs as input
(cherry picked from commit 684cc50d86608cccf7500ce00af89ea34c488473)
2025-02-07 16:55:28 -05:00
John Ericson
5100b85537 Merge pull request #1436 from obsidiansystems/test-aliased-constituents
Improve tests around constituents
2025-02-07 16:45:17 -05:00
John Ericson
141b5fd0b5 Improve tests around constituents
- Test how shorter names are preferred when multiple jobs resolve to the
  same derivation.

- Test the exact aggregate map we get, by looking in the DB.
2025-02-07 16:39:13 -05:00
John Ericson
8d78648e65 Merge pull request #1435 from obsidiansystems/flake-tests
Test using Hydra with flakes
2025-02-07 11:21:02 -05:00
John Ericson
8a8ac14877 Test using Hydra with flakes
It seemed there was no self-contained end-to-end test actually doing
this?!

Among other things, this will help ensure that the switch-over to
`nix-eval-jobs` is correct.
2025-02-06 21:30:49 -05:00
John Ericson
2feddd8511 flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/2c42e7b8d9ea32e59c01334852599b548b214d31' (2024-05-23)
  → 'github:NixOS/nix/ef5c846e257e1e284ad47ed6be4308d190fe6531' (2024-05-29)
2024-05-29 17:05:41 -04:00
John Ericson
cd925e876f Merge branch 'master' into nix-next 2024-05-29 17:05:04 -04:00
John Ericson
91bb72e323 Merge pull request #1386 from NixOS/machine-dedup
Dedup with nix: use `nix::Machine::parseConfig`
2024-05-23 11:21:41 -04:00
John Ericson
09a1e64ed2 Dedup with nix: use nix::Machine::parseConfig
Companion to https://github.com/NixOS/nix/pull/10763
2024-05-23 09:59:46 -04:00
John Ericson
bede2a141a flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/5845fd59c34198ad52a7f7bcb6d3ea7176ca437b' (2024-05-22)
  → 'github:NixOS/nix/2c42e7b8d9ea32e59c01334852599b548b214d31' (2024-05-23)
2024-05-23 09:59:32 -04:00
John Ericson
b75bf5c882 Merge pull request #1385 from NixOS/machine-dedup
Utilize `nix::Machine` more fully
2024-05-23 00:00:58 -04:00
John Ericson
d55bea2a1e Utilize nix::Machine more fully
With https://github.com/NixOS/nix/pull/9839, the `storeUri` field is
much better structured, so we can use it while still opening the SSH
connection ourselves.
2024-05-22 22:02:46 -04:00
John Ericson
346badc66f flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/a57abbd143f8ed44e823c3244e93507f64020878' (2024-05-20)
  → 'github:NixOS/nix/5845fd59c34198ad52a7f7bcb6d3ea7176ca437b' (2024-05-22)
2024-05-22 22:00:38 -04:00
John Ericson
a940450875 Merge branch 'master' into nix-next 2024-05-22 22:00:25 -04:00
John Ericson
af120e7195 Merge pull request #1384 from NixOS/more-serve-proto-factor-out
Dedup more protocol code
2024-05-20 21:49:06 -04:00
John Ericson
71c4e2dc5b Dedup more protocol code
Use https://github.com/NixOS/nix/pull/10749
2024-05-20 18:19:59 -04:00
John Ericson
e4552ddf91 flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/beb3c2bc7ab781c1b8907b647c6e72b72fa9f56b' (2024-05-17)
  → 'github:NixOS/nix/a57abbd143f8ed44e823c3244e93507f64020878' (2024-05-20)
2024-05-20 18:11:37 -04:00
John Ericson
e4f2c84f8d flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/0930058189f350a3729cd5aef2ffc8dae2ad436e' (2024-05-08)
  → 'github:NixOS/nix/beb3c2bc7ab781c1b8907b647c6e72b72fa9f56b' (2024-05-17)
2024-05-17 20:02:54 -04:00
John Ericson
e10fc2bd13 Merge branch 'master' into nix-next 2024-05-17 19:59:41 -04:00
John Ericson
5e910fa2ce flake.lock: Update
Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/00ca2b05b8fbbef09be5d1e4820857605d4c31b6' (2024-05-03)
  → 'github:NixOS/nix/0930058189f350a3729cd5aef2ffc8dae2ad436e' (2024-05-08)
2024-05-08 11:25:14 -04:00
John Ericson
4b767aa9a2 Merge branch 'master' into nix-next 2024-05-08 11:25:04 -04:00
John Ericson
2926aa1d64 Merge branch 'factor-out-tests' into nix-next 2024-05-03 12:44:51 -04:00
John Ericson
555ea44a7a Merge branch 'master' into nix-next 2024-05-03 12:35:06 -04:00
John Ericson
410077a26e Merge branch 'nix-2.22' into nix-next 2024-05-03 10:49:28 -04:00
John Ericson
39a4e4791e Switch (back) to Nix master
Re-creating `nix-next` after using it in #1375.

Flake lock file updates:

• Updated input 'nix':
    'github:NixOS/nix/60824fa97c588a0faf68ea61260a47e388b0a4e5' (2024-04-11)
  → 'github:NixOS/nix/aa438b8fbaebbbdb922655127053c4e8ea3e55bb' (2024-04-12)
2024-04-12 17:30:57 -04:00
26 changed files with 535 additions and 1102 deletions

56
flake.lock generated
View File

@@ -1,30 +1,11 @@
{
"nodes": {
"libgit2": {
"flake": false,
"locked": {
"lastModified": 1715853528,
"narHash": "sha256-J2rCxTecyLbbDdsyBWn9w7r3pbKRMkI9E7RvRgAqBdY=",
"owner": "libgit2",
"repo": "libgit2",
"rev": "36f7e21ad757a3dacc58cf7944329da6bc1d6e96",
"type": "github"
},
"original": {
"owner": "libgit2",
"ref": "v1.8.1",
"repo": "libgit2",
"type": "github"
}
},
"nix": {
"inputs": {
"flake-compat": [],
"flake-parts": [],
"git-hooks-nix": [],
"libgit2": [
"libgit2"
],
"nixfmt": [],
"nixpkgs": [
"nixpkgs"
],
@@ -32,40 +13,57 @@
"nixpkgs-regression": []
},
"locked": {
"lastModified": 1726787955,
"narHash": "sha256-XFznzb8L4SdUm9u+w3DPpMWJhffuv+/6+aiVl00slns=",
"lastModified": 1739571938,
"narHash": "sha256-NlaLAed/xei6RWpU2HIIbDjILRC4l1NIfGeyrn7ALQs=",
"owner": "NixOS",
"repo": "nix",
"rev": "a7fdef6858dd45b9d7bda7c92324c63faee7f509",
"rev": "ffc649d2eabdd3e678b5bcc211dd59fd06debf3e",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "2.24-maintenance",
"ref": "ssh-ng-extensions-for-hydra",
"repo": "nix",
"type": "github"
}
},
"nix-eval-jobs": {
"flake": false,
"locked": {
"lastModified": 1739499741,
"narHash": "sha256-dNMJY6+G3PwE8lIAhwetPJdA2DxCEKRXPY/EtHmdDh4=",
"owner": "Ericson2314",
"repo": "nix-eval-jobs",
"rev": "de345eb4518d952c2d86261b270f2c31edecd3de",
"type": "github"
},
"original": {
"owner": "Ericson2314",
"ref": "nix-2.27",
"repo": "nix-eval-jobs",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1726688310,
"narHash": "sha256-Xc9lEtentPCEtxc/F1e6jIZsd4MPDYv4Kugl9WtXlz0=",
"lastModified": 1739461644,
"narHash": "sha256-1o1qR0KYozYGRrnqytSpAhVBYLNBHX+Lv6I39zGRzKM=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "dbebdd67a6006bb145d98c8debf9140ac7e651d0",
"rev": "97a719c9f0a07923c957cf51b20b329f9fb9d43f",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-24.05-small",
"ref": "nixos-24.11-small",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"libgit2": "libgit2",
"nix": "nix",
"nix-eval-jobs": "nix-eval-jobs",
"nixpkgs": "nixpkgs"
}
}

View File

@@ -1,21 +1,28 @@
{
description = "A Nix-based continuous build system";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.05-small";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.11-small";
inputs.libgit2 = { url = "github:libgit2/libgit2/v1.8.1"; flake = false; };
inputs.nix.url = "github:NixOS/nix/2.24-maintenance";
inputs.nix.inputs.nixpkgs.follows = "nixpkgs";
inputs.nix.inputs.libgit2.follows = "libgit2";
inputs.nix = {
url = "github:NixOS/nix/ssh-ng-extensions-for-hydra";
inputs.nixpkgs.follows = "nixpkgs";
# hide nix dev tooling from our lock file
inputs.nix.inputs.flake-parts.follows = "";
inputs.nix.inputs.git-hooks-nix.follows = "";
inputs.nix.inputs.nixpkgs-regression.follows = "";
inputs.nix.inputs.nixpkgs-23-11.follows = "";
inputs.nix.inputs.flake-compat.follows = "";
# hide nix dev tooling from our lock file
inputs.flake-parts.follows = "";
inputs.git-hooks-nix.follows = "";
inputs.nixpkgs-regression.follows = "";
inputs.nixpkgs-23-11.follows = "";
inputs.flake-compat.follows = "";
inputs.nixfmt.follows = "";
};
outputs = { self, nixpkgs, nix, ... }:
inputs.nix-eval-jobs = {
url = "github:Ericson2314/nix-eval-jobs/nix-2.27";
# We want to control the deps precisely
flake = false;
};
outputs = { self, nixpkgs, nix, nix-eval-jobs, ... }:
let
systems = [ "x86_64-linux" "aarch64-linux" ];
forEachSystem = nixpkgs.lib.genAttrs systems;
@@ -24,6 +31,7 @@
# A Nixpkgs overlay that provides a 'hydra' package.
overlays.default = final: prev: {
nix-eval-jobs = final.callPackage nix-eval-jobs {};
hydra = final.callPackage ./package.nix {
inherit (nixpkgs.lib) fileset;
rawSrc = self;
@@ -67,10 +75,19 @@
});
packages = forEachSystem (system: {
nix-eval-jobs = nixpkgs.legacyPackages.${system}.callPackage nix-eval-jobs {
nix = nix.packages.${system}.nix;
};
hydra = nixpkgs.legacyPackages.${system}.callPackage ./package.nix {
inherit (nixpkgs.lib) fileset;
inherit (self.packages.${system}) nix-eval-jobs;
rawSrc = self;
nix = nix.packages.${system}.nix;
inherit (nix.packages.${system})
nix-util
nix-store
nix-main
nix-cli
;
nix-perl-bindings = nix.hydraJobs.perlBindings.${system};
};
default = self.packages.${system}.hydra;

View File

@@ -8,22 +8,22 @@ project('hydra', 'cpp',
],
)
nix_util_dep = dependency('nix-util', required: true)
nix_store_dep = dependency('nix-store', required: true)
nix_main_dep = dependency('nix-main', required: true)
nix_expr_dep = dependency('nix-expr', required: true)
nix_flake_dep = dependency('nix-flake', required: true)
nix_cmd_dep = dependency('nix-cmd', required: true)
# Nix need extra flags not provided in its pkg-config files.
nix_dep = declare_dependency(
dependencies: [
nix_util_dep,
nix_store_dep,
nix_main_dep,
nix_expr_dep,
nix_flake_dep,
nix_cmd_dep,
],
compile_args: ['-include', 'nix/config.h'],
compile_args: [
'-include', 'nix/config-util.hh',
'-include', 'nix/config-store.hh',
'-include', 'nix/config-main.hh',
],
)
pqxx_dep = dependency('libpqxx', required: true)

View File

@@ -8,7 +8,10 @@
, perlPackages
, nix
, nix-util
, nix-store
, nix-main
, nix-cli
, nix-perl-bindings
, git
@@ -50,6 +53,7 @@
, xz
, gnutar
, gnused
, nix-eval-jobs
, rpm
, dpkg
@@ -161,7 +165,7 @@ stdenv.mkDerivation (finalAttrs: {
nukeReferences
pkg-config
mdbook
nix
nix-cli
perlDeps
perl
unzip
@@ -171,7 +175,9 @@ stdenv.mkDerivation (finalAttrs: {
libpqxx
openssl
libxslt
nix
nix-util
nix-store
nix-main
perlDeps
perl
boost
@@ -190,6 +196,7 @@ stdenv.mkDerivation (finalAttrs: {
openldap
postgresql_13
pixz
nix-eval-jobs
];
checkInputs = [
@@ -197,13 +204,14 @@ stdenv.mkDerivation (finalAttrs: {
glibcLocales
libressl.nc
python3
nix-cli
];
hydraPath = lib.makeBinPath (
[
subversion
openssh
nix
nix-cli
coreutils
findutils
pixz
@@ -218,6 +226,7 @@ stdenv.mkDerivation (finalAttrs: {
darcs
gnused
breezy
nix-eval-jobs
] ++ lib.optionals stdenv.isLinux [ rpm dpkg cdrkit ]
);
@@ -232,7 +241,7 @@ stdenv.mkDerivation (finalAttrs: {
shellHook = ''
pushd $(git rev-parse --show-toplevel) >/dev/null
PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-eval-jobs:$(pwd)/src/hydra-queue-runner:$PATH
PATH=$(pwd)/src/hydra-evaluator:$(pwd)/src/script:$(pwd)/src/hydra-queue-runner:$PATH
PERL5LIB=$(pwd)/src/lib:$PERL5LIB
export HYDRA_HOME="$(pwd)/src/"
mkdir -p .hydra-data
@@ -263,12 +272,13 @@ stdenv.mkDerivation (finalAttrs: {
--prefix PATH ':' $out/bin:$hydraPath \
--set HYDRA_RELEASE ${version} \
--set HYDRA_HOME $out/libexec/hydra \
--set NIX_RELEASE ${nix.name or "unknown"}
--set NIX_RELEASE ${nix-cli.name or "unknown"} \
--set NIX_EVAL_JOBS_RELEASE ${nix-eval-jobs.name or "unknown"}
done
'';
dontStrip = true;
meta.description = "Build of Hydra on ${stdenv.system}";
passthru = { inherit perlDeps nix; };
passthru = { inherit perlDeps; };
})

View File

@@ -1,587 +0,0 @@
#include <iostream>
#include <thread>
#include <optional>
#include <unordered_map>
#include "shared.hh"
#include "store-api.hh"
#include "eval.hh"
#include "eval-gc.hh"
#include "eval-inline.hh"
#include "eval-settings.hh"
#include "signals.hh"
#include "terminal.hh"
#include "util.hh"
#include "get-drvs.hh"
#include "globals.hh"
#include "common-eval-args.hh"
#include "flake/flakeref.hh"
#include "flake/flake.hh"
#include "attr-path.hh"
#include "derivations.hh"
#include "local-fs-store.hh"
#include "hydra-config.hh"
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include <nlohmann/json.hpp>
void check_pid_status_nonblocking(pid_t check_pid)
{
// Only check 'initialized' and known PID's
if (check_pid <= 0) { return; }
int wstatus = 0;
pid_t pid = waitpid(check_pid, &wstatus, WNOHANG);
// -1 = failure, WNOHANG: 0 = no change
if (pid <= 0) { return; }
std::cerr << "child process (" << pid << ") ";
if (WIFEXITED(wstatus)) {
std::cerr << "exited with status=" << WEXITSTATUS(wstatus) << std::endl;
} else if (WIFSIGNALED(wstatus)) {
std::cerr << "killed by signal=" << WTERMSIG(wstatus) << std::endl;
} else if (WIFSTOPPED(wstatus)) {
std::cerr << "stopped by signal=" << WSTOPSIG(wstatus) << std::endl;
} else if (WIFCONTINUED(wstatus)) {
std::cerr << "continued" << std::endl;
}
}
using namespace nix;
static Path gcRootsDir;
static size_t maxMemorySize;
struct MyArgs : MixEvalArgs, MixCommonArgs, RootArgs
{
Path releaseExpr;
bool flake = false;
bool dryRun = false;
MyArgs() : MixCommonArgs("hydra-eval-jobs")
{
addFlag({
.longName = "gc-roots-dir",
.description = "garbage collector roots directory",
.labels = {"path"},
.handler = {&gcRootsDir}
});
addFlag({
.longName = "dry-run",
.description = "don't create store derivations",
.handler = {&dryRun, true}
});
addFlag({
.longName = "flake",
.description = "build a flake",
.handler = {&flake, true}
});
expectArg("expr", &releaseExpr);
}
};
static MyArgs myArgs;
static std::string queryMetaStrings(EvalState & state, PackageInfo & drv, const std::string & name, const std::string & subAttribute)
{
Strings res;
std::function<void(Value & v)> rec;
rec = [&](Value & v) {
state.forceValue(v, noPos);
if (v.type() == nString)
res.emplace_back(v.string_view());
else if (v.isList())
for (unsigned int n = 0; n < v.listSize(); ++n)
rec(*v.listElems()[n]);
else if (v.type() == nAttrs) {
auto a = v.attrs()->find(state.symbols.create(subAttribute));
if (a != v.attrs()->end())
res.push_back(std::string(state.forceString(*a->value, a->pos, "while evaluating meta attributes")));
}
};
Value * v = drv.queryMeta(name);
if (v) rec(*v);
return concatStringsSep(", ", res);
}
static void worker(
EvalState & state,
Bindings & autoArgs,
AutoCloseFD & to,
AutoCloseFD & from)
{
Value vTop;
if (myArgs.flake) {
using namespace flake;
auto [flakeRef, fragment, outputSpec] = parseFlakeRefWithFragmentAndExtendedOutputsSpec(fetchSettings, myArgs.releaseExpr, absPath("."));
auto vFlake = state.allocValue();
auto lockedFlake = lockFlake(
flakeSettings,
state,
flakeRef,
LockFlags {
.updateLockFile = false,
.useRegistries = false,
.allowUnlocked = false,
});
callFlake(state, lockedFlake, *vFlake);
auto vOutputs = vFlake->attrs()->get(state.symbols.create("outputs"))->value;
state.forceValue(*vOutputs, noPos);
auto aHydraJobs = vOutputs->attrs()->get(state.symbols.create("hydraJobs"));
if (!aHydraJobs)
aHydraJobs = vOutputs->attrs()->get(state.symbols.create("checks"));
if (!aHydraJobs)
throw Error("flake '%s' does not provide any Hydra jobs or checks", flakeRef);
vTop = *aHydraJobs->value;
} else {
state.evalFile(lookupFileArg(state, myArgs.releaseExpr), vTop);
}
auto vRoot = state.allocValue();
state.autoCallFunction(autoArgs, vTop, *vRoot);
while (true) {
/* Wait for the master to send us a job name. */
writeLine(to.get(), "next");
auto s = readLine(from.get());
if (s == "exit") break;
if (!hasPrefix(s, "do ")) abort();
std::string attrPath(s, 3);
debug("worker process %d at '%s'", getpid(), attrPath);
/* Evaluate it and send info back to the master. */
nlohmann::json reply;
try {
auto vTmp = findAlongAttrPath(state, attrPath, autoArgs, *vRoot).first;
auto v = state.allocValue();
state.autoCallFunction(autoArgs, *vTmp, *v);
if (auto drv = getDerivation(state, *v, false)) {
// CA derivations do not have static output paths, so we
// have to defensively not query output paths in case we
// encounter one.
PackageInfo::Outputs outputs = drv->queryOutputs(
!experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
if (drv->querySystem() == "unknown")
state.error<EvalError>("derivation must have a 'system' attribute").debugThrow();
auto drvPath = state.store->printStorePath(drv->requireDrvPath());
nlohmann::json job;
job["nixName"] = drv->queryName();
job["system"] =drv->querySystem();
job["drvPath"] = drvPath;
job["description"] = drv->queryMetaString("description");
job["license"] = queryMetaStrings(state, *drv, "license", "shortName");
job["homepage"] = drv->queryMetaString("homepage");
job["maintainers"] = queryMetaStrings(state, *drv, "maintainers", "email");
job["schedulingPriority"] = drv->queryMetaInt("schedulingPriority", 100);
job["timeout"] = drv->queryMetaInt("timeout", 36000);
job["maxSilent"] = drv->queryMetaInt("maxSilent", 7200);
job["isChannel"] = drv->queryMetaBool("isHydraChannel", false);
/* If this is an aggregate, then get its constituents. */
auto a = v->attrs()->get(state.symbols.create("_hydraAggregate"));
if (a && state.forceBool(*a->value, a->pos, "while evaluating the `_hydraAggregate` attribute")) {
auto a = v->attrs()->get(state.symbols.create("constituents"));
if (!a)
state.error<EvalError>("derivation must have a constituents attribute").debugThrow();
NixStringContext context;
state.coerceToString(a->pos, *a->value, context, "while evaluating the `constituents` attribute", true, false);
for (auto & c : context)
std::visit(overloaded {
[&](const NixStringContextElem::Built & b) {
job["constituents"].push_back(b.drvPath->to_string(*state.store));
},
[&](const NixStringContextElem::Opaque & o) {
},
[&](const NixStringContextElem::DrvDeep & d) {
},
}, c.raw);
state.forceList(*a->value, a->pos, "while evaluating the `constituents` attribute");
for (unsigned int n = 0; n < a->value->listSize(); ++n) {
auto v = a->value->listElems()[n];
state.forceValue(*v, noPos);
if (v->type() == nString)
job["namedConstituents"].push_back(v->string_view());
}
}
/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
auto localStore = state.store.dynamic_pointer_cast<LocalFSStore>();
if (gcRootsDir != "" && localStore) {
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root))
localStore->addPermRoot(localStore->parseStorePath(drvPath), root);
}
nlohmann::json out;
for (auto & [outputName, optOutputPath] : outputs) {
if (optOutputPath) {
out[outputName] = state.store->printStorePath(*optOutputPath);
} else {
// See the `queryOutputs` call above; we should
// not encounter missing output paths otherwise.
assert(experimentalFeatureSettings.isEnabled(Xp::CaDerivations));
out[outputName] = nullptr;
}
}
job["outputs"] = std::move(out);
reply["job"] = std::move(job);
}
else if (v->type() == nAttrs) {
auto attrs = nlohmann::json::array();
StringSet ss;
for (auto & i : v->attrs()->lexicographicOrder(state.symbols)) {
std::string name(state.symbols[i->name]);
if (name.find(' ') != std::string::npos) {
printError("skipping job with illegal name '%s'", name);
continue;
}
attrs.push_back(name);
}
reply["attrs"] = std::move(attrs);
}
else if (v->type() == nNull)
;
else state.error<TypeError>("attribute '%s' is %s, which is not supported", attrPath, showType(*v)).debugThrow();
} catch (EvalError & e) {
auto msg = e.msg();
// Transmits the error we got from the previous evaluation
// in the JSON output.
reply["error"] = filterANSIEscapes(msg, true);
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
printError(msg);
}
writeLine(to.get(), reply.dump());
/* If our RSS exceeds the maximum, exit. The master will
start a new process. */
struct rusage r;
getrusage(RUSAGE_SELF, &r);
if ((size_t) r.ru_maxrss > maxMemorySize * 1024) break;
}
writeLine(to.get(), "restart");
}
int main(int argc, char * * argv)
{
/* Prevent undeclared dependencies in the evaluation via
$NIX_PATH. */
unsetenv("NIX_PATH");
return handleExceptions(argv[0], [&]() {
auto config = std::make_unique<HydraConfig>();
auto nrWorkers = config->getIntOption("evaluator_workers", 1);
maxMemorySize = config->getIntOption("evaluator_max_memory_size", 4096);
initNix();
initGC();
myArgs.parseCmdline(argvToStrings(argc, argv));
auto pureEval = config->getBoolOption("evaluator_pure_eval", myArgs.flake);
/* FIXME: The build hook in conjunction with import-from-derivation is causing "unexpected EOF" during eval */
settings.builders = "";
/* Prevent access to paths outside of the Nix search path and
to the environment. */
evalSettings.restrictEval = true;
/* When building a flake, use pure evaluation (no access to
'getEnv', 'currentSystem' etc. */
evalSettings.pureEval = pureEval;
if (myArgs.dryRun) settings.readOnlyMode = true;
if (myArgs.releaseExpr == "") throw UsageError("no expression specified");
if (gcRootsDir == "") printMsg(lvlError, "warning: `--gc-roots-dir' not specified");
struct State
{
std::set<std::string> todo{""};
std::set<std::string> active;
nlohmann::json jobs;
std::exception_ptr exc;
};
std::condition_variable wakeup;
Sync<State> state_;
/* Start a handler thread per worker process. */
auto handler = [&]()
{
pid_t pid = -1;
try {
AutoCloseFD from, to;
while (true) {
/* Start a new worker process if necessary. */
if (pid == -1) {
Pipe toPipe, fromPipe;
toPipe.create();
fromPipe.create();
pid = startProcess(
[&,
to{std::make_shared<AutoCloseFD>(std::move(fromPipe.writeSide))},
from{std::make_shared<AutoCloseFD>(std::move(toPipe.readSide))}
]()
{
try {
auto evalStore = myArgs.evalStoreUrl
? openStore(*myArgs.evalStoreUrl)
: openStore();
EvalState state(myArgs.lookupPath,
evalStore, fetchSettings, evalSettings);
Bindings & autoArgs = *myArgs.getAutoArgs(state);
worker(state, autoArgs, *to, *from);
} catch (Error & e) {
nlohmann::json err;
auto msg = e.msg();
err["error"] = filterANSIEscapes(msg, true);
printError(msg);
writeLine(to->get(), err.dump());
// Don't forget to print it into the STDERR log, this is
// what's shown in the Hydra UI.
writeLine(to->get(), "restart");
}
},
ProcessOptions { .allowVfork = false });
from = std::move(fromPipe.readSide);
to = std::move(toPipe.writeSide);
debug("created worker process %d", pid);
}
/* Check whether the existing worker process is still there. */
auto s = readLine(from.get());
if (s == "restart") {
pid = -1;
continue;
} else if (s != "next") {
auto json = nlohmann::json::parse(s);
throw Error("worker error: %s", (std::string) json["error"]);
}
/* Wait for a job name to become available. */
std::string attrPath;
while (true) {
checkInterrupt();
auto state(state_.lock());
if ((state->todo.empty() && state->active.empty()) || state->exc) {
writeLine(to.get(), "exit");
return;
}
if (!state->todo.empty()) {
attrPath = *state->todo.begin();
state->todo.erase(state->todo.begin());
state->active.insert(attrPath);
break;
} else
state.wait(wakeup);
}
/* Tell the worker to evaluate it. */
writeLine(to.get(), "do " + attrPath);
/* Wait for the response. */
auto response = nlohmann::json::parse(readLine(from.get()));
/* Handle the response. */
StringSet newAttrs;
if (response.find("job") != response.end()) {
auto state(state_.lock());
state->jobs[attrPath] = response["job"];
}
if (response.find("attrs") != response.end()) {
for (auto & i : response["attrs"]) {
std::string path = i;
if (path.find(".") != std::string::npos){
path = "\"" + path + "\"";
}
auto s = (attrPath.empty() ? "" : attrPath + ".") + (std::string) path;
newAttrs.insert(s);
}
}
if (response.find("error") != response.end()) {
auto state(state_.lock());
state->jobs[attrPath]["error"] = response["error"];
}
/* Add newly discovered job names to the queue. */
{
auto state(state_.lock());
state->active.erase(attrPath);
for (auto & s : newAttrs)
state->todo.insert(s);
wakeup.notify_all();
}
}
} catch (...) {
check_pid_status_nonblocking(pid);
auto state(state_.lock());
state->exc = std::current_exception();
wakeup.notify_all();
}
};
std::vector<std::thread> threads;
for (size_t i = 0; i < nrWorkers; i++)
threads.emplace_back(std::thread(handler));
for (auto & thread : threads)
thread.join();
auto state(state_.lock());
if (state->exc)
std::rethrow_exception(state->exc);
/* For aggregate jobs that have named consistuents
(i.e. constituents that are a job name rather than a
derivation), look up the referenced job and add it to the
dependencies of the aggregate derivation. */
auto store = openStore();
for (auto i = state->jobs.begin(); i != state->jobs.end(); ++i) {
auto jobName = i.key();
auto & job = i.value();
auto named = job.find("namedConstituents");
if (named == job.end()) continue;
std::unordered_map<std::string, std::string> brokenJobs;
auto getNonBrokenJobOrRecordError = [&brokenJobs, &jobName, &state](
const std::string & childJobName) -> std::optional<nlohmann::json> {
auto childJob = state->jobs.find(childJobName);
if (childJob == state->jobs.end()) {
printError("aggregate job '%s' references non-existent job '%s'", jobName, childJobName);
brokenJobs[childJobName] = "does not exist";
return std::nullopt;
}
if (childJob->find("error") != childJob->end()) {
std::string error = (*childJob)["error"];
printError("aggregate job '%s' references broken job '%s': %s", jobName, childJobName, error);
brokenJobs[childJobName] = error;
return std::nullopt;
}
return *childJob;
};
if (myArgs.dryRun) {
for (std::string jobName2 : *named) {
auto job2 = getNonBrokenJobOrRecordError(jobName2);
if (!job2) {
continue;
}
std::string drvPath2 = (*job2)["drvPath"];
job["constituents"].push_back(drvPath2);
}
} else {
auto drvPath = store->parseStorePath((std::string) job["drvPath"]);
auto drv = store->readDerivation(drvPath);
for (std::string jobName2 : *named) {
auto job2 = getNonBrokenJobOrRecordError(jobName2);
if (!job2) {
continue;
}
auto drvPath2 = store->parseStorePath((std::string) (*job2)["drvPath"]);
auto drv2 = store->readDerivation(drvPath2);
job["constituents"].push_back(store->printStorePath(drvPath2));
drv.inputDrvs.map[drvPath2].value = {drv2.outputs.begin()->first};
}
if (brokenJobs.empty()) {
std::string drvName(drvPath.name());
assert(hasSuffix(drvName, drvExtension));
drvName.resize(drvName.size() - drvExtension.size());
auto hashModulo = hashDerivationModulo(*store, drv, true);
if (hashModulo.kind != DrvHash::Kind::Regular) continue;
auto h = hashModulo.hashes.find("out");
if (h == hashModulo.hashes.end()) continue;
auto outPath = store->makeOutputPath("out", h->second, drvName);
drv.env["out"] = store->printStorePath(outPath);
drv.outputs.insert_or_assign("out", DerivationOutput::InputAddressed { .path = outPath });
auto newDrvPath = store->printStorePath(writeDerivation(*store, drv));
debug("rewrote aggregate derivation %s -> %s", store->printStorePath(drvPath), newDrvPath);
job["drvPath"] = newDrvPath;
job["outputs"]["out"] = store->printStorePath(outPath);
}
}
job.erase("namedConstituents");
/* Register the derivation as a GC root. !!! This
registers roots for jobs that we may have already
done. */
auto localStore = store.dynamic_pointer_cast<LocalFSStore>();
if (gcRootsDir != "" && localStore) {
auto drvPath = job["drvPath"].get<std::string>();
Path root = gcRootsDir + "/" + std::string(baseNameOf(drvPath));
if (!pathExists(root))
localStore->addPermRoot(localStore->parseStorePath(drvPath), root);
}
if (!brokenJobs.empty()) {
std::stringstream ss;
for (const auto& [jobName, error] : brokenJobs) {
ss << jobName << ": " << error << "\n";
}
job["error"] = ss.str();
}
}
std::cout << state->jobs.dump(2) << "\n";
});
}

View File

@@ -1,8 +0,0 @@
hydra_eval_jobs = executable('hydra-eval-jobs',
'hydra-eval-jobs.cc',
dependencies: [
libhydra_dep,
nix_dep,
],
install: true,
)

View File

@@ -7,140 +7,35 @@
#include "build-result.hh"
#include "path.hh"
#include "ssh-store.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "state.hh"
#include "current-process.hh"
#include "processes.hh"
#include "util.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "ssh.hh"
#include "finally.hh"
#include "url.hh"
using namespace nix;
bool ::Machine::isLocalhost() const
{
return storeUri.params.empty() && std::visit(overloaded {
[](const StoreReference::Auto &) {
return true;
},
[](const StoreReference::Specified & s) {
return
(s.scheme == "local" || s.scheme == "unix") ||
((s.scheme == "ssh" || s.scheme == "ssh-ng") &&
s.authority == "localhost");
},
}, storeUri.variant);
}
namespace nix::build_remote {
static Strings extraStoreArgs(std::string & machine)
{
Strings result;
try {
auto parsed = parseURL(machine);
if (parsed.scheme != "ssh") {
throw SysError("Currently, only (legacy-)ssh stores are supported!");
}
machine = parsed.authority.value_or("");
auto remoteStore = parsed.query.find("remote-store");
if (remoteStore != parsed.query.end()) {
result = {"--store", shellEscape(remoteStore->second)};
}
} catch (BadURL &) {
// We just try to continue with `machine->sshName` here for backwards compat.
}
return result;
}
static std::unique_ptr<SSHMaster::Connection> openConnection(
::Machine::ptr machine, SSHMaster & master)
{
Strings command = {"nix-store", "--serve", "--write"};
if (machine->isLocalhost()) {
command.push_back("--builders");
command.push_back("");
} else {
command.splice(command.end(), extraStoreArgs(machine->sshName));
}
auto ret = master.startCommand(std::move(command), {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
});
// XXX: determine the actual max value we can use from /proc.
// FIXME: Should this be upstreamed into `startCommand` in Nix?
int pipesize = 1024 * 1024;
fcntl(ret->in.get(), F_SETPIPE_SZ, &pipesize);
fcntl(ret->out.get(), F_SETPIPE_SZ, &pipesize);
return ret;
}
static void copyClosureTo(
::Machine::Connection & conn,
Store & destStore,
const StorePathSet & paths,
SubstituteFlag useSubstitutes = NoSubstitute)
{
StorePathSet closure;
destStore.computeFSClosure(paths, closure);
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote
host. */
auto present = conn.queryValidPaths(
destStore, true, closure, useSubstitutes);
if (present.size() == closure.size()) return;
auto sorted = destStore.topoSortPaths(closure);
StorePathSet missing;
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
if (!present.count(*i)) missing.insert(*i);
printMsg(lvlDebug, "sending %d missing paths", missing.size());
std::unique_lock<std::timed_mutex> sendLock(conn.machine->state->sendLock,
std::chrono::seconds(600));
conn.to << ServeProto::Command::ImportPaths;
destStore.exportPaths(missing, conn.to);
conn.to.flush();
if (readInt(conn.from) != 1)
throw Error("remote machine failed to import closure");
}
// FIXME: use Store::topoSortPaths().
static StorePaths reverseTopoSortPaths(const std::map<StorePath, UnkeyedValidPathInfo> & paths)
{
StorePaths sorted;
StorePathSet visited;
std::function<void(const StorePath & path)> dfsVisit;
dfsVisit = [&](const StorePath & path) {
if (!visited.insert(path).second) return;
auto info = paths.find(path);
auto references = info == paths.end() ? StorePathSet() : info->second.references;
for (auto & i : references)
/* Don't traverse into paths that don't exist. That can
happen due to substitutes for non-existent paths. */
if (i != path && paths.count(i))
dfsVisit(i);
sorted.push_back(path);
};
for (auto & i : paths)
dfsVisit(i.first);
return sorted;
}
static std::pair<Path, AutoCloseFD> openLogFile(const std::string & logDir, const StorePath & drvPath)
{
std::string base(drvPath.to_string());
@@ -198,18 +93,18 @@ static BasicDerivation sendInputs(
MaintainCount<counter> mc2(nrStepsCopyingTo);
printMsg(lvlDebug, "sending closure of %s to %s",
localStore.printStorePath(step.drvPath), conn.machine->sshName);
localStore.printStorePath(step.drvPath), conn.machine->storeUri.render());
auto now1 = std::chrono::steady_clock::now();
/* Copy the input closure. */
if (conn.machine->isLocalhost()) {
StorePathSet closure;
destStore.computeFSClosure(basicDrv.inputSrcs, closure);
copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute);
} else {
copyClosureTo(conn, destStore, basicDrv.inputSrcs, Substitute);
}
copyClosure(
destStore,
conn.machine->isLocalhost() ? localStore : *conn.store,
basicDrv.inputSrcs,
NoRepair,
NoCheckSigs,
Substitute);
auto now2 = std::chrono::steady_clock::now();
@@ -224,11 +119,10 @@ static BuildResult performBuild(
Store & localStore,
StorePath drvPath,
const BasicDerivation & drv,
const ServeProto::BuildOptions & options,
counter & nrStepsBuilding
)
{
conn.putBuildDerivationRequest(localStore, drvPath, drv, options);
auto kont = conn.store->buildDerivationAsync(drvPath, drv, bmNormal);
BuildResult result;
@@ -237,7 +131,10 @@ static BuildResult performBuild(
startTime = time(0);
{
MaintainCount<counter> mc(nrStepsBuilding);
result = ServeProto::Serialise<BuildResult>::read(localStore, conn);
result = kont();
// Without proper call-once functions, we need to manually
// delete after calling.
kont = {};
}
stopTime = time(0);
@@ -253,7 +150,7 @@ static BuildResult performBuild(
// If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation.
if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 6)
if (GET_PROTOCOL_MINOR(conn.store->getProtocol()) < 6)
{
// If the remote is too old to handle CA derivations, we cant get this
// far anyways
@@ -278,81 +175,6 @@ static BuildResult performBuild(
return result;
}
static std::map<StorePath, UnkeyedValidPathInfo> queryPathInfos(
::Machine::Connection & conn,
Store & localStore,
StorePathSet & outputs,
size_t & totalNarSize
)
{
/* Get info about each output path. */
std::map<StorePath, UnkeyedValidPathInfo> infos;
conn.to << ServeProto::Command::QueryPathInfos;
ServeProto::write(localStore, conn, outputs);
conn.to.flush();
while (true) {
auto storePathS = readString(conn.from);
if (storePathS == "") break;
auto storePath = localStore.parseStorePath(storePathS);
auto info = ServeProto::Serialise<UnkeyedValidPathInfo>::read(localStore, conn);
totalNarSize += info.narSize;
infos.insert_or_assign(std::move(storePath), std::move(info));
}
return infos;
}
static void copyPathFromRemote(
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource([&](Sink & sink)
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn.to << ServeProto::Command::DumpStorePath << localStore.printStorePath(info.path);
conn.to.flush();
TeeSource tee(conn.from, sink);
extractNarData(tee, localStore.printStorePath(info.path), narMembers);
});
destStore.addToStore(info, *source2, NoRepair, NoCheckSigs);
}
static void copyPathsFromRemote(
::Machine::Connection & conn,
NarMemberDatas & narMembers,
Store & localStore,
Store & destStore,
const std::map<StorePath, UnkeyedValidPathInfo> & infos
)
{
auto pathsSorted = reverseTopoSortPaths(infos);
for (auto & path : pathsSorted) {
auto & info = infos.find(path)->second;
copyPathFromRemote(
conn, narMembers, localStore, destStore,
ValidPathInfo { path, info });
}
}
}
/* using namespace nix::build_remote; */
@@ -415,7 +237,6 @@ void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult)
void State::buildRemote(ref<Store> destStore,
::Machine::ptr machine, Step::ptr step,
const ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers)
@@ -430,27 +251,43 @@ void State::buildRemote(ref<Store> destStore,
updateStep(ssConnecting);
SSHMaster master {
machine->sshName,
machine->sshKey,
machine->sshPublicHostKey,
false, // no SSH master yet
false, // no compression yet
logFD.get(),
};
// FIXME: rewrite to use Store.
auto child = build_remote::openConnection(machine, master);
::Machine::Connection conn {
.machine = machine,
.store = [&]{
auto * pSpecified = std::get_if<StoreReference::Specified>(&machine->storeUri.variant);
if (!pSpecified || pSpecified->scheme != "ssh-ng") {
throw Error("Currently, only ssh-ng:// stores are supported!");
}
auto remoteStore = machine->openStore().dynamic_pointer_cast<RemoteStore>();
auto remoteStoreConfig = std::dynamic_pointer_cast<SSHStoreConfig>(remoteStore);
assert(remoteStore);
if (machine->isLocalhost()) {
auto rp_new = remoteStoreConfig->remoteProgram.get();
rp_new.push_back("--builders");
rp_new.push_back("");
const_cast<nix::Setting<Strings> &>(remoteStoreConfig->remoteProgram).assign(rp_new);
}
remoteStoreConfig->extraSshArgs = {
"-a", "-oBatchMode=yes", "-oConnectTimeout=60", "-oTCPKeepAlive=yes"
};
// TODO logging
//const_cast<nix::Setting<int> &>(remoteStore->logFD).assign(logFD.get());
return nix::ref{remoteStore};
}(),
};
{
auto activeStepState(activeStep->state_.lock());
if (activeStepState->cancelled) throw Error("step cancelled");
activeStepState->pid = child->sshPid;
}
Finally clearPid([&]() {
auto activeStepState(activeStep->state_.lock());
activeStepState->pid = -1;
/* FIXME: there is a slight race here with step
cancellation in State::processQueueChange(), which
@@ -460,41 +297,13 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
});
::Machine::Connection conn {
{
.to = child->in.get(),
.from = child->out.get(),
/* Handshake. */
.remoteVersion = 0xdadbeef, // FIXME avoid dummy initialize
},
/*.machine =*/ machine,
};
Finally updateStats([&]() {
bytesReceived += conn.from.read;
bytesSent += conn.to.written;
// TODO
//auto stats = conn.store->getConnectionStats();
//bytesReceived += stats.bytesReceived;
//bytesSent += stats.bytesSent;
});
constexpr ServeProto::Version our_version = 0x206;
try {
conn.remoteVersion = decltype(conn)::handshake(
conn.to,
conn.from,
our_version,
machine->sshName);
} catch (EndOfFile & e) {
child->sshPid.wait();
std::string s = chomp(readFile(result.logFile));
throw Error("cannot connect to %1%: %2%", machine->sshName, s);
}
// Do not attempt to speak a newer version of the protocol.
//
// Per https://github.com/NixOS/nix/issues/9584 should be handled as
// part of `handshake` in upstream nix.
conn.remoteVersion = std::min(conn.remoteVersion, our_version);
{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
@@ -523,7 +332,7 @@ void State::buildRemote(ref<Store> destStore,
/* Do the build. */
printMsg(lvlDebug, "building %s on %s",
localStore->printStorePath(step->drvPath),
machine->sshName);
machine->storeUri.render());
updateStep(ssBuilding);
@@ -532,7 +341,6 @@ void State::buildRemote(ref<Store> destStore,
*localStore,
step->drvPath,
resolvedDrv,
buildOptions,
nrStepsBuilding
);
@@ -546,7 +354,7 @@ void State::buildRemote(ref<Store> destStore,
get a build log. */
if (result.isCached) {
printMsg(lvlInfo, "outputs of %s substituted or already valid on %s",
localStore->printStorePath(step->drvPath), machine->sshName);
localStore->printStorePath(step->drvPath), machine->storeUri.render());
unlink(result.logFile.c_str());
result.logFile = "";
}
@@ -563,19 +371,12 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std::chrono::steady_clock::now();
size_t totalNarSize = 0;
auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize);
if (totalNarSize > maxOutputSize) {
result.stepStatus = bsNarSizeLimitExceeded;
return;
}
/* Copy each path. */
printMsg(lvlDebug, "copying outputs of %s from %s (%d bytes)",
localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize);
printMsg(lvlDebug, "copying outputs of %s from %s",
localStore->printStorePath(step->drvPath), machine->storeUri.render());
copyClosure(*conn.store, *destStore, outputs);
build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos);
auto now2 = std::chrono::steady_clock::now();
result.overhead += std::chrono::duration_cast<std::chrono::milliseconds>(now2 - now1).count();
@@ -596,9 +397,11 @@ void State::buildRemote(ref<Store> destStore,
}
}
/* Shut down the connection. */
child->in = -1;
child->sshPid.wait();
/* Shut down the connection done by RAII.
Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/
} catch (Error & e) {
/* Disable this machine until a certain period of time has
@@ -612,7 +415,7 @@ void State::buildRemote(ref<Store> destStore,
info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4);
info->lastFailure = now;
int delta = retryInterval * std::pow(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30);
printMsg(lvlInfo, "will disable machine %1% for %2%s", machine->sshName, delta);
printMsg(lvlInfo, "will disable machine %1% for %2%s", machine->storeUri.render(), delta);
info->disabledUntil = now + std::chrono::seconds(delta);
}
throw;

View File

@@ -41,7 +41,7 @@ void State::builder(MachineReservation::ptr reservation)
} catch (std::exception & e) {
printMsg(lvlError, "uncaught exception building %s on %s: %s",
localStore->printStorePath(reservation->step->drvPath),
reservation->machine->sshName,
reservation->machine->storeUri.render(),
e.what());
}
}
@@ -98,13 +98,6 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
it). */
BuildID buildId;
std::optional<StorePath> buildDrvPath;
// Other fields set below
nix::ServeProto::BuildOptions buildOptions {
.maxLogSize = maxLogSize,
.nrRepeats = step->isDeterministic ? 1u : 0u,
.enforceDeterminism = step->isDeterministic,
.keepFailed = false,
};
auto conn(dbPool.get());
@@ -139,18 +132,19 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{
auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName));
if (i != jobsetRepeats.end())
buildOptions.nrRepeats = std::max(buildOptions.nrRepeats, i->second);
warn("jobset repeats is deprecated; nix stopped supporting this correctly a long time ago.");
}
}
if (!build) build = *dependents.begin();
buildId = build->id;
buildDrvPath = build->drvPath;
buildOptions.maxSilentTime = build->maxSilentTime;
buildOptions.buildTimeout = build->buildTimeout;
settings.maxLogSize = maxLogSize;
settings.maxSilentTime = build->maxSilentTime;
settings.buildTimeout = build->buildTimeout;
printInfo("performing step %s %d times on %s (needed by build %d and %d others)",
localStore->printStorePath(step->drvPath), buildOptions.nrRepeats + 1, machine->sshName, buildId, (dependents.size() - 1));
localStore->printStorePath(step->drvPath), 1, machine->storeUri.render(), buildId, (dependents.size() - 1));
}
if (!buildOneDone)
@@ -178,7 +172,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
unlink(result.logFile.c_str());
}
} catch (...) {
ignoreException();
ignoreExceptionInDestructor();
}
}
});
@@ -196,7 +190,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->storeUri.render(), bsBusy);
txn.commit();
}
@@ -211,7 +205,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers);
buildRemote(destStore, machine, step, result, activeStep, updateStep, narMembers);
} catch (Error & e) {
if (activeStep->state_.lock()->cancelled) {
printInfo("marking step %d of build %d as cancelled", stepNr, buildId);
@@ -253,7 +247,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
/* Finish the step in the database. */
if (stepNr) {
pqxx::work txn(*conn);
finishBuildStep(txn, result, buildId, stepNr, machine->sshName);
finishBuildStep(txn, result, buildId, stepNr, machine->storeUri.render());
txn.commit();
}
@@ -261,7 +255,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore,
issue). Retry a number of times. */
if (result.canRetry) {
printMsg(lvlError, "possibly transient failure building %s on %s: %s",
localStore->printStorePath(step->drvPath), machine->sshName, result.errorMsg);
localStore->printStorePath(step->drvPath), machine->storeUri.render(), result.errorMsg);
assert(stepNr);
bool retry;
{
@@ -452,7 +446,7 @@ void State::failStep(
build->finishedInDB)
continue;
createBuildStep(txn,
0, build->id, step, machine ? machine->sshName : "",
0, build->id, step, machine ? machine->storeUri.render() : "",
result.stepStatus, result.errorMsg, buildId == build->id ? 0 : buildId);
}

View File

@@ -256,7 +256,7 @@ system_time State::doDispatch()
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) {
debug("machine '%s' does not support step '%s' (system type '%s')",
mi.machine->sshName, localStore->printStorePath(step->drvPath), step->drv->platform);
mi.machine->storeUri.render(), localStore->printStorePath(step->drvPath), step->drv->platform);
continue;
}

View File

@@ -135,65 +135,26 @@ void State::parseMachines(const std::string & contents)
oldMachines = *machines_;
}
for (auto line : tokenizeString<Strings>(contents, "\n")) {
line = trim(std::string(line, 0, line.find('#')));
auto tokens = tokenizeString<std::vector<std::string>>(line);
if (tokens.size() < 3) continue;
tokens.resize(8);
if (tokens[5] == "-") tokens[5] = "";
auto supportedFeatures = tokenizeString<StringSet>(tokens[5], ",");
if (tokens[6] == "-") tokens[6] = "";
auto mandatoryFeatures = tokenizeString<StringSet>(tokens[6], ",");
for (auto & f : mandatoryFeatures)
supportedFeatures.insert(f);
using MaxJobs = std::remove_const<decltype(nix::Machine::maxJobs)>::type;
auto machine = std::make_shared<::Machine>(nix::Machine {
// `storeUri`, not yet used
"",
// `systemTypes`
tokenizeString<StringSet>(tokens[1], ","),
// `sshKey`
tokens[2] == "-" ? "" : tokens[2],
// `maxJobs`
tokens[3] != ""
? string2Int<MaxJobs>(tokens[3]).value()
: 1,
// `speedFactor`
std::stof(tokens[4].c_str()),
// `supportedFeatures`
std::move(supportedFeatures),
// `mandatoryFeatures`
std::move(mandatoryFeatures),
// `sshPublicHostKey`
tokens[7] != "" && tokens[7] != "-"
? tokens[7]
: "",
});
machine->sshName = tokens[0];
for (auto && machine_ : nix::Machine::parseConfig({}, contents)) {
auto machine = std::make_shared<::Machine>(std::move(machine_));
/* Re-use the State object of the previous machine with the
same name. */
auto i = oldMachines.find(machine->sshName);
auto i = oldMachines.find(machine->storeUri.variant);
if (i == oldMachines.end())
printMsg(lvlChatty, "adding new machine %1%", machine->sshName);
printMsg(lvlChatty, "adding new machine %1%", machine->storeUri.render());
else
printMsg(lvlChatty, "updating machine %1%", machine->sshName);
printMsg(lvlChatty, "updating machine %1%", machine->storeUri.render());
machine->state = i == oldMachines.end()
? std::make_shared<::Machine::State>()
: i->second->state;
newMachines[machine->sshName] = machine;
newMachines[machine->storeUri.variant] = machine;
}
for (auto & m : oldMachines)
if (newMachines.find(m.first) == newMachines.end()) {
if (m.second->enabled)
printInfo("removing machine %1%", m.first);
printInfo("removing machine %1%", m.second->storeUri.render());
/* Add a disabled ::Machine object to make sure stats are
maintained. */
auto machine = std::make_shared<::Machine>(*(m.second));
@@ -221,7 +182,7 @@ void State::monitorMachinesFile()
getEnv("NIX_REMOTE_SYSTEMS").value_or(pathExists(defaultMachinesFile) ? defaultMachinesFile : ""), ":");
if (machinesFiles.empty()) {
parseMachines("localhost " +
parseMachines("ssh-ng://localhost " +
(settings.thisSystem == "x86_64-linux" ? "x86_64-linux,i686-linux" : settings.thisSystem.get())
+ " - " + std::to_string(settings.maxBuildJobs) + " 1 "
+ concatStringsSep(",", StoreConfig::getDefaultSystemFeatures()));
@@ -657,7 +618,7 @@ void State::dumpStatus(Connection & conn)
machine["avgStepTime"] = (float) s->totalStepTime / s->nrStepsDone;
machine["avgStepBuildTime"] = (float) s->totalStepBuildTime / s->nrStepsDone;
}
statusJson["machines"][m->sshName] = machine;
statusJson["machines"][m->storeUri.render()] = machine;
}
}

View File

@@ -6,7 +6,6 @@
#include <map>
#include <memory>
#include <queue>
#include <regex>
#include <prometheus/counter.h>
#include <prometheus/gauge.h>
@@ -21,9 +20,7 @@
#include "store-api.hh"
#include "sync.hh"
#include "nar-extractor.hh"
#include "serve-protocol.hh"
#include "serve-protocol-impl.hh"
#include "serve-protocol-connection.hh"
#include "ssh-store.hh"
#include "machines.hh"
@@ -241,10 +238,6 @@ struct Machine : nix::Machine
{
typedef std::shared_ptr<Machine> ptr;
/* TODO Get rid of: `nix::Machine::storeUri` is normalized in a way
we are not yet used to, but once we are, we don't need this. */
std::string sshName;
struct State {
typedef std::shared_ptr<State> ptr;
counter currentJobs{0};
@@ -294,21 +287,19 @@ struct Machine : nix::Machine
return true;
}
bool isLocalhost()
{
std::regex r("^(ssh://|ssh-ng://)?localhost$");
return std::regex_search(sshName, r);
}
bool isLocalhost() const;
// A connection to a machine
struct Connection : nix::ServeProto::BasicClientConnection {
struct Connection {
// Backpointer to the machine
ptr machine;
// Opened store
nix::ref<nix::RemoteStore> store;
};
};
class HydraConfig;
struct HydraConfig;
class State
@@ -358,7 +349,7 @@ private:
/* The build machines. */
std::mutex machinesReadyLock;
typedef std::map<std::string, Machine::ptr> Machines;
typedef std::map<nix::StoreReference::Variant, Machine::ptr> Machines;
nix::Sync<Machines> machines; // FIXME: use atomic_shared_ptr
/* Various stats. */
@@ -551,7 +542,6 @@ private:
void buildRemote(nix::ref<nix::Store> destStore,
Machine::ptr machine, Step::ptr step,
const nix::ServeProto::BuildOptions & buildOptions,
RemoteResult & result, std::shared_ptr<ActiveStep> activeStep,
std::function<void(StepState)> updateStep,
NarMemberDatas & narMembers);

View File

@@ -51,6 +51,7 @@ sub begin :Private {
$c->stash->{curUri} = $c->request->uri;
$c->stash->{version} = $ENV{"HYDRA_RELEASE"} || "<devel>";
$c->stash->{nixVersion} = $ENV{"NIX_RELEASE"} || "<devel>";
$c->stash->{nixEvalJobsVersion} = $ENV{"NIX_EVAL_JOBS_RELEASE"} || "<devel>";
$c->stash->{curTime} = time;
$c->stash->{logo} = defined $c->config->{hydra_logo} ? "/logo" : "";
$c->stash->{tracker} = defined $c->config->{tracker} ? $c->config->{tracker} : "";

View File

@@ -1,6 +1,5 @@
# Native code
subdir('libhydra')
subdir('hydra-eval-jobs')
subdir('hydra-evaluator')
subdir('hydra-queue-runner')

View File

@@ -93,7 +93,7 @@
<footer class="navbar">
<hr />
<small>
<em><a href="http://nixos.org/hydra" target="_blank" class="squiggle">Hydra</a> [% HTML.escape(version) %] (using [% HTML.escape(nixVersion) %]).</em>
<em><a href="http://nixos.org/hydra" target="_blank" class="squiggle">Hydra</a> [% HTML.escape(version) %] (using [% HTML.escape(nixVersion) %] and [% HTML.escape(nixEvalJobsVersion) %]).</em>
[% IF c.user_exists %]
You are signed in as <tt>[% HTML.escape(c.user.username) %]</tt>
[%- IF c.user.type == 'google' %] via Google[% END %].

View File

@@ -17,6 +17,7 @@ use Hydra::Helper::Nix;
use Hydra::Model::DB;
use Hydra::Plugin;
use Hydra::Schema;
use IPC::Run;
use JSON::MaybeXS;
use Net::Statsd;
use Nix::Store;
@@ -357,22 +358,32 @@ sub evalJobs {
my @cmd;
if (defined $flakeRef) {
@cmd = ("hydra-eval-jobs",
"--flake", $flakeRef,
"--gc-roots-dir", getGCRootsDir,
"--max-jobs", 1);
my $nix_expr =
"let " .
"flake = builtins.getFlake (toString \"$flakeRef\"); " .
"in " .
"flake.hydraJobs " .
"or flake.checks " .
"or (throw \"flake '$flakeRef' does not provide any Hydra jobs or checks\")";
@cmd = ("nix-eval-jobs", "--expr", $nix_expr);
} else {
my $nixExprInput = $inputInfo->{$nixExprInputName}->[0]
or die "cannot find the input containing the job expression\n";
@cmd = ("hydra-eval-jobs",
@cmd = ("nix-eval-jobs",
"<" . $nixExprInputName . "/" . $nixExprPath . ">",
"--gc-roots-dir", getGCRootsDir,
"--max-jobs", 1,
inputsToArgs($inputInfo));
}
push @cmd, "--no-allow-import-from-derivation" if $config->{allow_import_from_derivation} // "true" ne "true";
push @cmd, ("--gc-roots-dir", getGCRootsDir);
push @cmd, ("--max-jobs", 1);
push @cmd, "--meta";
push @cmd, "--constituents";
push @cmd, "--force-recurse";
push @cmd, ("--option", "allow-import-from-derivation", "false") if $config->{allow_import_from_derivation} // "true" ne "true";
push @cmd, ("--workers", $config->{evaluator_workers} // 1);
push @cmd, ("--max-memory-size", $config->{evaluator_max_memory_size} // 4096);
if (defined $ENV{'HYDRA_DEBUG'}) {
sub escape {
@@ -384,14 +395,40 @@ sub evalJobs {
print STDERR "evaluator: @escaped\n";
}
(my $res, my $jobsJSON, my $stderr) = captureStdoutStderr(21600, @cmd);
die "hydra-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8))
. ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n")
if $res;
my $evalProc = IPC::Run::start \@cmd,
'>', IPC::Run::new_chunker, \my $out,
'2>', \my $err;
print STDERR "$stderr";
return sub {
while (1) {
$evalProc->pump;
if (!defined $out && !defined $err) {
$evalProc->finish;
if ($?) {
die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n";
}
return;
}
return decode_json($jobsJSON);
if (defined $err) {
print STDERR "$err";
undef $err;
}
if (defined $out && $out ne '') {
my $job;
try {
$job = decode_json($out);
} catch {
warn "nix-eval-jobs sent invalid JSON.\n parse error: $_\n invalid json: $out\n";
};
undef $out;
if (defined $job) {
return $job;
}
}
}
};
}
@@ -420,7 +457,7 @@ sub checkBuild {
my $firstOutputName = $outputNames[0];
my $firstOutputPath = $buildInfo->{outputs}->{$firstOutputName};
my $jobName = $buildInfo->{jobName} or die;
my $jobName = $buildInfo->{attr} or die;
my $drvPath = $buildInfo->{drvPath} or die;
my $build;
@@ -474,9 +511,30 @@ sub checkBuild {
my $time = time();
sub null {
my ($s) = @_;
return $s eq "" ? undef : $s;
sub getMeta {
my ($s, $def) = @_;
return ($s || "") eq "" ? $def : $s;
}
sub getMetaStrings {
my ($v, $k, $acc) = @_;
my $t = ref $v;
if ($t eq 'HASH') {
push @$acc, $v->{$k} if exists $v->{$k};
} elsif ($t eq 'ARRAY') {
getMetaStrings($_, $k, $acc) foreach @$v;
} elsif (defined $v) {
push @$acc, $v;
}
}
sub getMetaConcatStrings {
my ($v, $k) = @_;
my @strings;
getMetaStrings($v, $k, \@strings);
return join(", ", @strings) || undef;
}
# Add the build to the database.
@@ -484,19 +542,19 @@ sub checkBuild {
{ timestamp => $time
, jobset_id => $jobset->id
, job => $jobName
, description => null($buildInfo->{description})
, license => null($buildInfo->{license})
, homepage => null($buildInfo->{homepage})
, maintainers => null($buildInfo->{maintainers})
, maxsilent => $buildInfo->{maxSilent}
, timeout => $buildInfo->{timeout}
, nixname => $buildInfo->{nixName}
, description => getMeta($buildInfo->{meta}->{description}, undef)
, license => getMetaConcatStrings($buildInfo->{meta}->{license}, "shortName")
, homepage => getMeta($buildInfo->{meta}->{homepage}, undef)
, maintainers => getMetaConcatStrings($buildInfo->{meta}->{maintainers}, "email")
, maxsilent => getMeta($buildInfo->{meta}->{maxSilent}, 7200)
, timeout => getMeta($buildInfo->{meta}->{timeout}, 36000)
, nixname => $buildInfo->{name}
, drvpath => $drvPath
, system => $buildInfo->{system}
, priority => $buildInfo->{schedulingPriority}
, priority => getMeta($buildInfo->{meta}->{schedulingPriority}, 100)
, finished => 0
, iscurrent => 1
, ischannel => $buildInfo->{isChannel}
, ischannel => getMeta($buildInfo->{meta}->{isChannel}, 0)
});
$build->buildoutputs->create({ name => $_, path => $buildInfo->{outputs}->{$_} })
@@ -665,7 +723,7 @@ sub checkJobsetWrapped {
return;
}
# Hash the arguments to hydra-eval-jobs and check the
# Hash the arguments to nix-eval-jobs and check the
# JobsetInputHashes to see if the previous evaluation had the same
# inputs. If so, bail out.
my @args = ($jobset->nixexprinput // "", $jobset->nixexprpath // "", inputsToArgs($inputInfo));
@@ -687,19 +745,12 @@ sub checkJobsetWrapped {
# Evaluate the job expression.
my $evalStart = clock_gettime(CLOCK_MONOTONIC);
my $jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
my $evalStop = clock_gettime(CLOCK_MONOTONIC);
if ($jobsetsJobset) {
my @keys = keys %$jobs;
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless (scalar @keys) == 1 && $keys[0] eq "jobsets";
}
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
my $evalStop;
my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
if ($dryRun) {
foreach my $name (keys %{$jobs}) {
my $job = $jobs->{$name};
while (defined(my $job = $jobsIter->())) {
my $name = $job->{attr};
if (defined $job->{drvPath}) {
print STDERR "good job $name: $job->{drvPath}\n";
} else {
@@ -709,36 +760,20 @@ sub checkJobsetWrapped {
return;
}
die "Jobset contains a job with an empty name. Make sure the jobset evaluates to an attrset of jobs.\n"
if defined $jobs->{""};
$jobs->{$_}->{jobName} = $_ for keys %{$jobs};
my $jobOutPathMap = {};
my $jobsetChanged = 0;
my $dbStart = clock_gettime(CLOCK_MONOTONIC);
# Store the error messages for jobs that failed to evaluate.
my $evaluationErrorTime = time;
my $evaluationErrorMsg = "";
foreach my $job (values %{$jobs}) {
next unless defined $job->{error};
$evaluationErrorMsg .=
($job->{jobName} ne "" ? "in job $job->{jobName}" : "at top-level") .
":\n" . $job->{error} . "\n\n";
}
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create(
{ errormsg => $evaluationErrorMsg
, errortime => $evaluationErrorTime
}
);
my $jobOutPathMap = {};
my $jobsetChanged = 0;
my %buildMap;
$db->txn_do(sub {
$db->txn_do(sub {
my $prevEval = getPrevJobsetEval($db, $jobset, 1);
# Clear the "current" flag on all builds. Since we're in a
@@ -751,7 +786,7 @@ sub checkJobsetWrapped {
, evaluationerror => $evaluationErrorRecord
, timestamp => time
, checkouttime => abs(int($checkoutStop - $checkoutStart))
, evaltime => abs(int($evalStop - $evalStart))
, evaltime => 0
, hasnewbuilds => 0
, nrbuilds => 0
, flake => $flakeRef
@@ -759,11 +794,24 @@ sub checkJobsetWrapped {
, nixexprpath => $jobset->nixexprpath
});
# Schedule each successfully evaluated job.
foreach my $job (permute(values %{$jobs})) {
next if defined $job->{error};
#print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n";
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins);
my @jobsWithConstituents;
while (defined(my $job = $jobsIter->())) {
if ($jobsetsJobset) {
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless $job->{attr} eq "jobsets";
}
$evaluationErrorMsg .=
($job->{attr} ne "" ? "in job $job->{attr}" : "at top-level") .
":\n" . $job->{error} . "\n\n" if defined $job->{error};
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins)
unless defined $job->{error};
if (defined $job->{constituents}) {
push @jobsWithConstituents, $job;
}
}
# Have any builds been added or removed since last time?
@@ -801,21 +849,20 @@ sub checkJobsetWrapped {
$drvPathToId{$x->{drvPath}} = $x;
}
foreach my $job (values %{$jobs}) {
next unless $job->{constituents};
foreach my $job (values @jobsWithConstituents) {
next unless defined $job->{constituents};
if (defined $job->{error}) {
die "aggregate job $job->{jobName} failed with the error: $job->{error}\n";
die "aggregate job $job->{attr} failed with the error: $job->{error}\n";
}
my $x = $drvPathToId{$job->{drvPath}} or
die "aggregate job $job->{jobName} has no corresponding build record.\n";
die "aggregate job $job->{attr} has no corresponding build record.\n";
foreach my $drvPath (@{$job->{constituents}}) {
my $constituent = $drvPathToId{$drvPath};
if (defined $constituent) {
$db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}});
} else {
warn "aggregate job $job->{jobName} has a constituent $drvPath that doesn't correspond to a Hydra build\n";
warn "aggregate job $job->{attr} has a constituent $drvPath that doesn't correspond to a Hydra build\n";
}
}
}
@@ -857,11 +904,15 @@ sub checkJobsetWrapped {
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2;
$jobset->update({ lastcheckedtime => time, forceeval => undef });
$evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg });
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
$evalStop = clock_gettime(CLOCK_MONOTONIC);
$ev->update({ evaltime => abs(int($evalStop - $evalStart)) });
});
my $dbStop = clock_gettime(CLOCK_MONOTONIC);
Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000));
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
Net::Statsd::increment("hydra.evaluator.evals");
Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged;
}

View File

@@ -18,14 +18,14 @@ isnt($res, 0, "hydra-eval-jobset exits non-zero");
ok(utf8::decode($stderr), "Stderr output is UTF8-clean");
like(
$stderr,
qr/aggregate job mixed_aggregate failed with the error: constituentA: does not exist/,
qr/aggregate job mixed_aggregate failed with the error: "constituentA": does not exist/,
"The stderr record includes a relevant error message"
);
$jobset->discard_changes; # refresh from DB
$jobset->discard_changes({ '+columns' => {'errormsg' => 'errormsg'} }); # refresh from DB
like(
$jobset->errormsg,
qr/aggregate job mixed_aggregate failed with the error: constituentA: does not exist/,
qr/aggregate job mixed_aggregate failed with the error: "constituentA": does not exist/,
"The jobset records a relevant error message"
);

View File

@@ -5,13 +5,58 @@ use Test2::V0;
my $ctx = test_context();
my $builds = $ctx->makeAndEvaluateJobset(
expression => 'constituents.nix',
my $expression = 'constituents.nix';
my $jobsetCtx = $ctx->makeJobset(
expression => $expression,
);
my $builds = $ctx->evaluateJobset(
jobset => $jobsetCtx->{"jobset"},
expression => $expression,
build => 0,
);
my $constituentA = $builds->{"constituentA"};
my $directAggregate = $builds->{"direct_aggregate"};
my $indirectAggregate = $builds->{"indirect_aggregate"};
my $mixedAggregate = $builds->{"mixed_aggregate"};
# Ensure that we get exactly the aggregates we expect
my %expected_constituents = (
'direct_aggregate' => {
'constituentA' => 1,
},
'indirect_aggregate' => {
'constituentA' => 1,
},
'mixed_aggregate' => {
# Note that `constituentA_alias` becomes `constituentA`, because
# the shorter name is preferred
'constituentA' => 1,
'constituentB' => 1,
},
);
my $rs = $ctx->db->resultset('AggregateConstituents')->search(
{},
{
join => [ 'aggregate', 'constituent' ], # Use correct relationship names
columns => [],
'+select' => [ 'aggregate.job', 'constituent.job' ],
'+as' => [ 'aggregate_job', 'constituent_job' ],
}
);
my %actual_constituents;
while (my $row = $rs->next) {
my $aggregate_job = $row->get_column('aggregate_job');
my $constituent_job = $row->get_column('constituent_job');
$actual_constituents{$aggregate_job} //= {};
$actual_constituents{$aggregate_job}{$constituent_job} = 1;
}
is(\%actual_constituents, \%expected_constituents, "Exact aggregate constituents as expected");
# Check that deletion also doesn't work accordingly
is(system('nix-store', '--delete', $constituentA->drvpath), 256, "Deleting a constituent derivation fails");
is(system('nix-store', '--delete', $directAggregate->drvpath), 256, "Deleting the direct aggregate derivation fails");

View File

@@ -0,0 +1,67 @@
use feature 'unicode_strings';
use strict;
use warnings;
use Setup;
use Test2::V0;
use File::Copy qw(cp);
my $ctx = test_context(
nix_config => qq|
experimental-features = nix-command flakes
|,
hydra_config => q|
<runcommand>
evaluator_pure_eval = false
</runcommand>
|
);
sub checkFlake {
my ($flake) = @_;
cp($ctx->jobsdir . "/basic.nix", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/config.nix", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/empty-dir-builder.sh", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/fail.sh", $ctx->jobsdir . "/" . $flake);
cp($ctx->jobsdir . "/succeed-with-failed.sh", $ctx->jobsdir . "/" . $flake);
chmod 0755, $ctx->jobsdir . "/" . $flake . "/empty-dir-builder.sh";
chmod 0755, $ctx->jobsdir . "/" . $flake . "/fail.sh";
chmod 0755, $ctx->jobsdir . "/" . $flake . "/succeed-with-failed.sh";
my $builds = $ctx->makeAndEvaluateJobset(
flake => 'path:' . $ctx->jobsdir . "/" . $flake,
build => 1
);
subtest "Build: succeed_with_failed" => sub {
my $build = $builds->{"succeed_with_failed"};
is($build->finished, 1, "Build should be finished.");
is($build->buildstatus, 6, "succeeeded-but-failed should have buildstatus 6.");
};
subtest "Build: empty_dir" => sub {
my $build = $builds->{"empty_dir"};
is($build->finished, 1, "Build should be finished.");
is($build->buildstatus, 0, "Should have succeeded.");
};
subtest "Build: fails" => sub {
my $build = $builds->{"fails"};
is($build->finished, 1, "Build should be finished.");
is($build->buildstatus, 1, "Should have failed.");
};
}
subtest "Flake using `checks`" => sub {
checkFlake 'flake-checks'
};
subtest "Flake using `hydraJobs`" => sub {
checkFlake 'flake-hydraJobs'
};
done_testing;

View File

@@ -0,0 +1,22 @@
use feature 'unicode_strings';
use strict;
use warnings;
use Setup;
use Test2::V0;
my $ctx = test_context();
my $builds = $ctx->makeAndEvaluateJobset(
expression => "meta.nix",
build => 1
);
my $build = $builds->{"full-of-meta"};
is($build->finished, 1, "Build should be finished.");
is($build->description, "This is the description of the job.", "Wrong description extracted from the build.");
is($build->license, "MIT, BSD", "Wrong licenses extracted from the build.");
is($build->homepage, "https://example.com/", "Wrong homepage extracted from the build.");
is($build->maintainers, 'alice@example.com, bob@not.found', "Wrong maintainers extracted from the build.");
done_testing;

View File

@@ -5,6 +5,8 @@ rec {
builder = ./empty-dir-builder.sh;
};
constituentA_alias = constituentA;
constituentB = mkDerivation {
name = "empty-dir-B";
builder = ./empty-dir-builder.sh;
@@ -32,7 +34,7 @@ rec {
name = "mixed_aggregate";
_hydraAggregate = true;
constituents = [
"constituentA"
"constituentA_alias"
constituentB
];
builder = ./empty-dir-builder.sh;

View File

@@ -0,0 +1,6 @@
{
outputs = { ... }: {
checks =
import ./basic.nix;
};
}

View File

@@ -0,0 +1,6 @@
{
outputs = { ... }: {
hydraJobs =
import ./basic.nix;
};
}

17
t/jobs/meta.nix Normal file
View File

@@ -0,0 +1,17 @@
with import ./config.nix;
{
full-of-meta =
mkDerivation {
name = "full-of-meta";
builder = ./empty-dir-builder.sh;
meta = {
description = "This is the description of the job.";
license = [ { shortName = "MIT"; } "BSD" ];
homepage = "https://example.com/";
maintainers = [ "alice@example.com" { email = "bob@not.found"; } ];
outPath = "${placeholder "out"}";
};
};
}

View File

@@ -165,20 +165,46 @@ sub nix_state_dir {
sub makeAndEvaluateJobset {
my ($self, %opts) = @_;
my $expression = $opts{'expression'} || die "Mandatory 'expression' option not passed to makeAndEvaluateJobset.\n";
my $jobsdir = $opts{'jobsdir'} // $self->jobsdir;
my $should_build = $opts{'build'} // 0;
my $expression = $opts{'expression'};
my $flake = $opts{'flake'};
if (not $expression and not $flake) {
die "One of 'expression' or 'flake' must be passed to makeEvaluateJobset.\n";
}
my $jobsetCtx = $self->makeJobset(
expression => $expression,
my $jobsdir = $opts{'jobsdir'} // $self->jobsdir;
my %args = (
jobsdir => $jobsdir,
);
my $jobset = $jobsetCtx->{"jobset"};
if ($expression) {
$args{expression} = $expression;
}
if ($flake) {
$args{flake} = $flake;
}
my $jobsetCtx = $self->makeJobset(%args);
return $self->evaluateJobset(
jobset => $jobsetCtx->{"jobset"},
expression => $expression,
flake => $flake,
build => $opts{"build"} // 0,
)
}
sub evaluateJobset {
my ($self, %opts) = @_;
my $jobset = $opts{'jobset'};
my $expression = $opts{'expression'} // $opts{'flake'};
evalSucceeds($jobset) or die "Evaluating jobs/$expression should exit with return code 0.\n";
my $builds = {};
my $should_build = $opts{'build'};
for my $build ($jobset->builds) {
if ($should_build) {
runBuild($build) or die "Build '".$build->job."' from jobs/$expression should exit with return code 0.\n";
@@ -195,7 +221,7 @@ sub makeAndEvaluateJobset {
#
# In return, you get a hash of the user, project, and jobset records.
#
# This always uses an `expression` from the `jobsdir` directory.
# This always uses an `expression` or `flake` from the `jobsdir` directory.
#
# Hash Parameters:
#
@@ -204,7 +230,12 @@ sub makeAndEvaluateJobset {
sub makeJobset {
my ($self, %opts) = @_;
my $expression = $opts{'expression'} || die "Mandatory 'expression' option not passed to makeJobset.\n";
my $expression = $opts{'expression'};
my $flake = $opts{'flake'};
if (not $expression and not $flake) {
die "One of 'expression' or 'flake' must be passed to makeJobset.\n";
}
my $jobsdir = $opts{'jobsdir'} // $self->jobsdir;
# Create a new user for this test
@@ -222,12 +253,20 @@ sub makeJobset {
});
# Create a new jobset for this test and set up the inputs
my $jobset = $project->jobsets->create({
my %args = (
name => rand_chars(),
nixexprinput => "jobs",
nixexprpath => $expression,
emailoverride => ""
});
);
if ($expression) {
$args{type} = 0;
$args{nixexprinput} = "jobs";
$args{nixexprpath} = $expression;
}
if ($flake) {
$args{type} = 1;
$args{flake} = $flake;
}
my $jobset = $project->jobsets->create(\%args);
my $jobsetinput = $jobset->jobsetinputs->create({name => "jobs", type => "path"});
$jobsetinput->jobsetinputalts->create({altnr => 0, value => $jobsdir});

View File

@@ -27,7 +27,7 @@ testenv.prepend('PERL5LIB',
separator: ':'
)
testenv.prepend('PATH',
fs.parent(hydra_eval_jobs.full_path()),
fs.parent(find_program('nix').full_path()),
fs.parent(hydra_evaluator.full_path()),
fs.parent(hydra_queue_runner.full_path()),
meson.project_source_root() / 'src/script',

View File

@@ -22,11 +22,11 @@ is(nrQueuedBuildsForJobset($jobset), 0, "Evaluating jobs/broken-constituent.nix
like(
$jobset->errormsg,
qr/^does-not-exist: does not exist$/m,
qr/^"does-not-exist": does not exist$/m,
"Evaluating jobs/broken-constituent.nix should log an error for does-not-exist");
like(
$jobset->errormsg,
qr/^does-not-evaluate: error: assertion 'false' failed$/m,
qr/^"does-not-evaluate": "error: assertion 'false' failed/m,
"Evaluating jobs/broken-constituent.nix should log an error for does-not-evaluate");
done_testing;