@@ -9,13 +9,10 @@
# include "path.hh"
# include "legacy-ssh-store.hh"
# include "serve-protocol.hh"
# include "serve-protocol-impl.hh"
# include "state.hh"
# include "current-process.hh"
# include "processes.hh"
# include "util.hh"
# include "serve-protocol.hh"
# include "serve-protocol-impl.hh"
# include "ssh.hh"
# include "finally.hh"
# include "url.hh"
@@ -39,108 +36,6 @@ bool ::Machine::isLocalhost() const
namespace nix : : build_remote {
static std : : unique_ptr < SSHMaster : : Connection > openConnection (
: : Machine : : ptr machine , SSHMaster & master )
{
Strings command = { " nix-store " , " --serve " , " --write " } ;
if ( machine - > isLocalhost ( ) ) {
command . push_back ( " --builders " ) ;
command . push_back ( " " ) ;
} else {
auto remoteStore = machine - > storeUri . params . find ( " remote-store " ) ;
if ( remoteStore ! = machine - > storeUri . params . end ( ) ) {
command . push_back ( " --store " ) ;
command . push_back ( shellEscape ( remoteStore - > second ) ) ;
}
}
auto ret = master . startCommand ( std : : move ( command ) , {
" -a " , " -oBatchMode=yes " , " -oConnectTimeout=60 " , " -oTCPKeepAlive=yes "
} ) ;
// XXX: determine the actual max value we can use from /proc.
// FIXME: Should this be upstreamed into `startCommand` in Nix?
int pipesize = 1024 * 1024 ;
fcntl ( ret - > in . get ( ) , F_SETPIPE_SZ , & pipesize ) ;
fcntl ( ret - > out . get ( ) , F_SETPIPE_SZ , & pipesize ) ;
return ret ;
}
static void copyClosureTo (
: : Machine : : Connection & conn ,
Store & destStore ,
const StorePathSet & paths ,
SubstituteFlag useSubstitutes = NoSubstitute )
{
StorePathSet closure ;
destStore . computeFSClosure ( paths , closure ) ;
/* Send the "query valid paths" command with the "lock" option
enabled. This prevents a race where the remote host
garbage-collect paths that are already there. Optionally, ask
the remote host to substitute missing paths. */
// FIXME: substitute output pollutes our build log
/* Get back the set of paths that are already valid on the remote
host. */
auto present = conn . queryValidPaths (
destStore , true , closure , useSubstitutes ) ;
if ( present . size ( ) = = closure . size ( ) ) return ;
auto sorted = destStore . topoSortPaths ( closure ) ;
StorePathSet missing ;
for ( auto i = sorted . rbegin ( ) ; i ! = sorted . rend ( ) ; + + i )
if ( ! present . count ( * i ) ) missing . insert ( * i ) ;
printMsg ( lvlDebug , " sending %d missing paths " , missing . size ( ) ) ;
std : : unique_lock < std : : timed_mutex > sendLock ( conn . machine - > state - > sendLock ,
std : : chrono : : seconds ( 600 ) ) ;
conn . to < < ServeProto : : Command : : ImportPaths ;
destStore . exportPaths ( missing , conn . to ) ;
conn . to . flush ( ) ;
if ( readInt ( conn . from ) ! = 1 )
throw Error ( " remote machine failed to import closure " ) ;
}
// FIXME: use Store::topoSortPaths().
static StorePaths reverseTopoSortPaths ( const std : : map < StorePath , UnkeyedValidPathInfo > & paths )
{
StorePaths sorted ;
StorePathSet visited ;
std : : function < void ( const StorePath & path ) > dfsVisit ;
dfsVisit = [ & ] ( const StorePath & path ) {
if ( ! visited . insert ( path ) . second ) return ;
auto info = paths . find ( path ) ;
auto references = info = = paths . end ( ) ? StorePathSet ( ) : info - > second . references ;
for ( auto & i : references )
/* Don't traverse into paths that don't exist. That can
happen due to substitutes for non-existent paths. */
if ( i ! = path & & paths . count ( i ) )
dfsVisit ( i ) ;
sorted . push_back ( path ) ;
} ;
for ( auto & i : paths )
dfsVisit ( i . first ) ;
return sorted ;
}
static std : : pair < Path , AutoCloseFD > openLogFile ( const std : : string & logDir , const StorePath & drvPath )
{
std : : string base ( drvPath . to_string ( ) ) ;
@@ -203,13 +98,13 @@ static BasicDerivation sendInputs(
auto now1 = std : : chrono : : steady_clock : : now ( ) ;
/* Copy the input closure. */
if ( conn . machine - > isLocalhost ( ) ) {
StorePathSet closu re;
destStore . computeFSClosure ( basicDrv . inputSrcs , closure ) ;
copyPaths ( destStore , localStore , closure , NoRepair , NoCheckSigs , NoSubstitute ) ;
} else {
copyClosureTo ( conn , destStore , basicDrv . inputSrcs , Substitute ) ;
}
copyClosure (
destSto re,
conn . machine - > isLocalhost ( ) ? localStore : * conn . store ,
basicDrv . inputSrcs ,
NoRepair ,
NoCheckSigs ,
Substitute ) ;
auto now2 = std : : chrono : : steady_clock : : now ( ) ;
@@ -228,7 +123,7 @@ static BuildResult performBuild(
counter & nrStepsBuilding
)
{
conn . putBuildDerivationRequest ( localStore , drvPath , drv , options ) ;
auto kont = conn . store - > buildDerivationAsync ( drvPath , drv , options ) ;
BuildResult result ;
@@ -237,7 +132,10 @@ static BuildResult performBuild(
startTime = time ( 0 ) ;
{
MaintainCount < counter > mc ( nrStepsBuilding ) ;
result = ServeProto : : Serialise < BuildResult > : : read ( localStore , conn ) ;
result = kont ( ) ;
// Without proper call-once functions, we need to manually
// delete after calling.
kont = { } ;
}
stopTime = time ( 0 ) ;
@@ -253,7 +151,7 @@ static BuildResult performBuild(
// If the protocol was too old to give us `builtOutputs`, initialize
// it manually by introspecting the derivation.
if ( GET_PROTOCOL_MINOR ( conn . remoteVersion ) < 6 )
if ( GET_PROTOCOL_MINOR ( conn . store - > getProtocol ( ) ) < 6 )
{
// If the remote is too old to handle CA derivations, we can’ t get this
// far anyways
@@ -278,55 +176,6 @@ static BuildResult performBuild(
return result ;
}
static void copyPathFromRemote (
: : Machine : : Connection & conn ,
NarMemberDatas & narMembers ,
Store & localStore ,
Store & destStore ,
const ValidPathInfo & info
)
{
/* Receive the NAR from the remote and add it to the
destination store. Meanwhile, extract all the info from the
NAR that getBuildOutput() needs. */
auto source2 = sinkToSource ( [ & ] ( Sink & sink )
{
/* Note: we should only send the command to dump the store
path to the remote if the NAR is actually going to get read
by the destination store, which won't happen if this path
is already valid on the destination store. Since this
lambda function only gets executed if someone tries to read
from source2, we will send the command from here rather
than outside the lambda. */
conn . to < < ServeProto : : Command : : DumpStorePath < < localStore . printStorePath ( info . path ) ;
conn . to . flush ( ) ;
TeeSource tee ( conn . from , sink ) ;
extractNarData ( tee , localStore . printStorePath ( info . path ) , narMembers ) ;
} ) ;
destStore . addToStore ( info , * source2 , NoRepair , NoCheckSigs ) ;
}
static void copyPathsFromRemote (
: : Machine : : Connection & conn ,
NarMemberDatas & narMembers ,
Store & localStore ,
Store & destStore ,
const std : : map < StorePath , UnkeyedValidPathInfo > & infos
)
{
auto pathsSorted = reverseTopoSortPaths ( infos ) ;
for ( auto & path : pathsSorted ) {
auto & info = infos . find ( path ) - > second ;
copyPathFromRemote (
conn , narMembers , localStore , destStore ,
ValidPathInfo { path , info } ) ;
}
}
}
/* using namespace nix::build_remote; */
@@ -404,30 +253,39 @@ void State::buildRemote(ref<Store> destStore,
updateStep ( ssConnecting ) ;
auto storeRef = machine - > completeStoreReference ( ) ;
auto * pSpecified = std : : get_if < StoreReference : : Specified > ( & storeRef . variant ) ;
if ( ! pSpecified | | pSpecified - > scheme ! = " ssh " ) {
throw Error ( " Currently, only (legacy-) ssh stores are supported ! " ) ;
}
LegacySSHStoreConfig storeConfig {
pSpecified - > scheme ,
pSpecified - > authority ,
storeRef . params
} ;
auto master = storeConfig . createSSHMaster (
false , // no SSH master yet
logFD . get ( ) ) ;
// FIXME: rewrite to use Store.
auto child = build_remot e: : open Connection( machine , master ) ;
: : Machin e: : Connection conn {
. machine = machine ,
. store = [ & ] {
auto * pSpecified = std : : get_if < StoreReference : : Specified > ( & machine - > storeUri . variant ) ;
if ( ! pSpecified | | pSpecified - > scheme ! = " ssh " ) {
throw Error ( " Currently, only (legacy-) ssh stores are supported ! " ) ;
}
auto remoteStore = machine - > openStore ( ) . dynamic_pointer_cast < LegacySSHStore > ( ) ;
assert ( remoteStore ) ;
remoteStore - > connPipeSize = 1024 * 1024 ;
if ( machine - > isLocalhost ( ) ) {
auto rp_new = remoteStore - > remoteProgram . get ( ) ;
rp_new . push_back ( " --builders " ) ;
rp_new . push_back ( " " ) ;
const_cast < nix : : Setting < Strings > & > ( remoteStore - > remoteProgram ) . assign ( rp_new ) ;
}
remoteStore - > extraSshArgs = {
" -a " , " -oBatchMode=yes " , " -oConnectTimeout=60 " , " -oTCPKeepAlive=yes "
} ;
const_cast < nix : : Setting < int > & > ( remoteStore - > logFD ) . assign ( logFD . get ( ) ) ;
return nix : : ref { remoteStore } ;
} ( ) ,
} ;
{
auto activeStepState ( activeStep - > state_ . lock ( ) ) ;
if ( activeStepState - > cancelled ) throw Error ( " step cancelled " ) ;
activeStepState - > pid = child - > sshPid ;
activeStepState - > pid = conn . store - > getConnectionPid ( ) ;
}
Finally clearPid ( [ & ] ( ) {
@@ -442,35 +300,12 @@ void State::buildRemote(ref<Store> destStore,
process. Meh. */
} ) ;
: : Machine : : Connection conn {
{
. to = child - > in . get ( ) ,
. from = child - > out . get ( ) ,
/* Handshake. */
. remoteVersion = 0xdadbeef , // FIXME avoid dummy initialize
} ,
/*.machine =*/ machine ,
} ;
Finally updateStats ( [ & ] ( ) {
bytesReceived + = conn . from . read ;
bytesSent + = conn . to . written ;
auto stats = conn . store - > getConnectionStats ( ) ;
bytesReceived + = stats . bytesReceived ;
bytesSent + = stats . bytesSent ;
} ) ;
constexpr ServeProto : : Version our_version = 0x206 ;
try {
conn . remoteVersion = decltype ( conn ) : : handshake (
conn . to ,
conn . from ,
our_version ,
machine - > storeUri . render ( ) ) ;
} catch ( EndOfFile & e ) {
child - > sshPid . wait ( ) ;
std : : string s = chomp ( readFile ( result . logFile ) ) ;
throw Error ( " cannot connect to ‘ %1%’ : %2% " , machine - > storeUri . render ( ) , s ) ;
}
{
auto info ( machine - > state - > connectInfo . lock ( ) ) ;
info - > consecutiveFailures = 0 ;
@@ -539,21 +374,12 @@ void State::buildRemote(ref<Store> destStore,
auto now1 = std : : chrono : : steady_clock : : now ( ) ;
auto infos = conn . queryPathInfos ( * localStore , outputs ) ;
size_t totalNarSize = 0 ;
for ( auto & [ _ , info ] : infos ) totalNarSize + = info . narSize ;
if ( totalNarSize > maxOutputSize ) {
result . stepStatus = bsNarSizeLimitExceeded ;
return ;
}
/* Copy each path. */
printMsg ( lvlDebug , " copying outputs of ‘ %s’ from ‘ %s’ (%d bytes) " ,
localStore - > printStorePath ( step - > drvPath ) , machine - > storeUri . render ( ) , totalNarSize );
printMsg ( lvlDebug , " copying outputs of ‘ %s’ from ‘ %s’ " ,
localStore - > printStorePath ( step - > drvPath ) , machine - > storeUri . render ( ) ) ;
copyClosure ( * conn . store , * destStore , outputs ) ;
build_remote : : copyPathsFromRemote ( conn , narMembers , * localStore , * destStore , infos ) ;
auto now2 = std : : chrono : : steady_clock : : now ( ) ;
result . overhead + = std : : chrono : : duration_cast < std : : chrono : : milliseconds > ( now2 - now1 ) . count ( ) ;
@@ -574,9 +400,11 @@ void State::buildRemote(ref<Store> destStore,
}
}
/* Shut down the connection. */
child - > in = - 1 ;
child - > sshPid . wait ( ) ;
/* Shut down the connection done by RAII.
Only difference is kill() instead of wait() (i.e. send signal
then wait())
*/
} catch ( Error & e ) {
/* Disable this machine until a certain period of time has