diff --git a/Makefile b/Makefile index 49cc92fc..b548d989 100644 --- a/Makefile +++ b/Makefile @@ -646,14 +646,14 @@ UNIT_TESTS : UNIT_TESTS1 UNIT_TESTS2 @( cd $(@D) ; git clean -ffdxq >/dev/null 2>/dev/null ) ; : # keep $(@D) to ease debugging, ignore rc as old versions of git work but generate an error @for f in $$(grep '^$(UT_DIR)/base/' Manifest) ; do df=$(@D)/$${f#$(UT_DIR)/base/} ; mkdir -p $$(dirname $$df) ; cp $$f $$df ; done @cd $(@D) ; find . -type f -printf '%P\n' > Manifest - @( cd $(@D) ; PATH=$(ROOT_DIR)/bin:$(ROOT_DIR)/_bin:$$PATH $(ROOT_DIR)/$< ) >$@ 2>$@.err || ( cat $@ $@.err ; mv $@ $@.out ; exit 1 ) + @( cd $(@D) ; PATH=$(ROOT_DIR)/bin:$(ROOT_DIR)/_bin:$$PATH $(ROOT_DIR)/$< ) >$@.out 2>$@.err && mv $@.out $@ || ( cat $@ $@.err ; exit 1 ) %.dir/tok : %.py $(LMAKE_FILES) _lib/ut.py @echo py test to $@ @mkdir -p $(@D) @( cd $(@D) ; git clean -ffdxq >/dev/null 2>/dev/null ) ; : # keep $(@D) to ease debugging, ignore rc as old versions of git work but generate an error @cp $< $(@D)/Lmakefile.py - @( cd $(@D) ; PATH=$(ROOT_DIR)/bin:$(ROOT_DIR)/_bin:$$PATH PYTHONPATH=$(ROOT_DIR)/lib:$(ROOT_DIR)/_lib HOME= $(PYTHON) Lmakefile.py ) >$@ 2>$@.err || ( cat $@ $@.err ; mv $@ $@.out ; exit 1 ) + @( cd $(@D) ; PATH=$(ROOT_DIR)/bin:$(ROOT_DIR)/_bin:$$PATH PYTHONPATH=$(ROOT_DIR)/lib:$(ROOT_DIR)/_lib HOME= $(PYTHON) Lmakefile.py ) >$@.out 2>$@.err && mv $@.out $@ || ( cat $@ $@.err ; exit 1 ) # # lmake env diff --git a/TO_DO b/TO_DO index 57daf612..42db2d03 100644 --- a/TO_DO +++ b/TO_DO @@ -12,7 +12,7 @@ items : * BUGS (implemented but does not work) **************************************************************************************************** -* fix compilation with LMAKE_SAN=T +* fix compilation with LMAKE_FLAGS=ST * missing some deps when reading elf - it seems that libc.so is missing at least in some occasions * reimplement hierarchical repositories @@ -28,8 +28,6 @@ items : * LACK (not implemented but necessary for lmake semantic) **************************************************************************************************** -* improve lshow -i - - generate info on nodes * manage 32 bits executables - compile ld_audit.so (and co) in both 32 & 64 bits - put adequate $PLATFORM in LD_PRELOAD @@ -53,6 +51,10 @@ items : * COSMETIC (ugly as long as not implemented) **************************************************************************************************** +* report "killed while waiting for stdout or stderr" + - when a job is killed while the child is terminated + - so as to indicate to the user that something weird was on going + - could also report this time in lshow -i * generate an error when calling depend on a target - either known at that time - or when it becomes a target if later @@ -78,8 +80,9 @@ items : - such requests must go through the engine_loop - this may take a while - this means during this time, all jobs may connect (and actually do) - - requiring a coket for each slurm slot + - requiring a socket for each slurm slot - defeating the purpose of disconnecting jobs during execution + - in all cases, code must resist to an error in accept * fix store to be compliant with strict aliasing rules * support 64-bits id - configure with NBits rather than types @@ -110,11 +113,6 @@ items : * FEATURES (not implemented and can work without) **************************************************************************************************** -* implement a lshow -r to see what is running now and -B to see BOM (list of sources) - - implement a generic walk through deps - - use it for check_deps, which will prevent jobs post critical deps to be run -* support dynamic values for interpreter - - python and shell * improve job isolation by using namespaces - much like faketree : https://github.com/enfabrica/enkit/tree/master/faketree - generalize tmp mapping diff --git a/_bin/sys_config b/_bin/sys_config index 558ee327..12bfa302 100755 --- a/_bin/sys_config +++ b/_bin/sys_config @@ -61,17 +61,20 @@ CXX=${CXX:-g++} type -p $CXX >/dev/null || { echo cannot find c++ compiler $CXX ; exit 1 ; } case "$($CXX --version|head -1)" in - *clang* ) CXX_FLAVOR=clang ; [ "$($CXX -dumpversion)" -lt 15 ] && { echo clang version must be at least 15 ; exit 1 ; } ;; - *g++* ) CXX_FLAVOR=gcc ; [ "$($CXX -dumpversion)" -lt 11 ] && { echo gcc version must be at least 11 ; exit 1 ; } ;; - * ) echo cannot recognize c++ compiler $CXX ; exit 1 ;; + *clang* ) CXX_FLAVOR=clang ; v=$($CXX -dumpversion) ; [ ${v%%.*} -lt 15 ] && { echo clang version must be at least 15 ; exit 1 ; } ;; + *g++* ) CXX_FLAVOR=gcc ; v=$($CXX -dumpversion) ; [ ${v%%.*} -lt 11 ] && { echo gcc version must be at least 11 ; exit 1 ; } ;; + * ) echo cannot recognize c++ compiler $CXX ; exit 1 ;; esac -LLP="$($CXX -v -E /dev/null 2>&1 | grep LIBRARY_PATH=)" # e.g. : LIBARY_PATH=/usr/lib/x:/a/b:/c:/a/b/c/.. -LLP="$(echo $LLP | sed 's/LIBRARY_PATH=//' )" # e.g. : /usr/lib/x:/a/b:/c:/a/b/c/.. -LLP="$(echo $LLP | sed 's/:/ /' )" # e.g. : /usr/lib/x /a/b /c /a/b/c/.. -LLP="$(echo $LLP | sort -u )" # e.g. : /usr/lib/x /a/b /c /a/b -LINK_LIB_PATH= # e.g. : /a/b /c /usr/lib/x -for l in $LLP ; do # e.g. : /a/b /c (suppress standard dirs as required in case of installed package) +# XXX : do the equivalent probe with clang +if [ $CXX_FLAVOR = gcc ] ; then + LLP="$($CXX -v -E /dev/null 2>&1 | grep LIBRARY_PATH=)" # e.g. : LIBRARY_PATH=/usr/lib/x:/a/b:/c:/a/b/c/.. +fi +LLP="$(echo $LLP | sed 's/LIBRARY_PATH=//' )" # e.g. : /usr/lib/x:/a/b:/c:/a/b/c/.. +LLP="$(echo $LLP | sed 's/:/ /' )" # e.g. : /usr/lib/x /a/b /c /a/b/c/.. +LLP="$(echo $LLP | sort -u )" # e.g. : /usr/lib/x /a/b /c /a/b +LINK_LIB_PATH= # e.g. : /a/b /c /usr/lib/x +for l in $LLP ; do # e.g. : /a/b /c (suppress standard dirs as required in case of installed package) case $l/ in /usr/lib/* ) ;; /usr/lib64/*) ;; diff --git a/_lib/lmake/rules.src.py b/_lib/lmake/rules.src.py index bfbe611d..d18f94ba 100644 --- a/_lib/lmake/rules.src.py +++ b/_lib/lmake/rules.src.py @@ -11,11 +11,12 @@ import lmake from . import has_ld_audit,pdict,root_dir # if not in an lmake repo, root_dir is not set to current dir +shell = '$BASH' # . +python = _sys.executable + _std_path = '$STD_PATH' # substituted at installation time -_bash = '$BASH' # . _ld_library_path = '$LD_LIBRARY_PATH' # . _lmake_dir = __file__.rsplit('/lib/',1)[0] -_python = _sys.executable _rules = lmake._rules # list of rules that must be filled in by user code @@ -87,8 +88,8 @@ class Rule(_RuleBase) : n_retries = 1 # number of retries in case of job lost. 1 is a reasonable value # n_tokens = 1 # number of jobs likely to run in parallel for this rule (used for ETA estimation) # prio = 0 # in case of ambiguity, rules are selected with highest prio first - python = (_python,) # python used for callable cmd - shell = (_bash ,) # shell used for str cmd (_sh is usually /bin/sh which may test for dir existence before chdir, which defeats auto_mkdir) + python = (python,) # python used for callable cmd + shell = (shell ,) # shell used for str cmd (_sh is usually /bin/sh which may test for dir existence before chdir, which defeats auto_mkdir) start_delay = 3 # delay before sending a start message if job is not done by then, 3 is a reasonable compromise max_stderr_len = 100 # maximum number of stderr lines shown in output (full content is accessible with lshow -e), 100 is a reasonable compromise # timeout = None # timeout allocated to job execution (in s), must be None or an int diff --git a/_lib/read_makefiles.py b/_lib/read_makefiles.py index dd7b862e..3235b6f8 100755 --- a/_lib/read_makefiles.py +++ b/_lib/read_makefiles.py @@ -183,8 +183,9 @@ def handle_inheritance(rule) : typ,dyn = StdAttrs[k] if typ and not ( dyn and callable(v) ) : try : - if typ in (tuple,list) and not isinstance(v,(tuple,list)) : v = typ((v,)) - else : v = typ( v ) + if callable(v) : pass + elif typ in (tuple,list) and not isinstance(v,(tuple,list)) : v = typ((v,)) + else : v = typ( v ) except : raise TypeError(f'bad format for {k} : cannot be converted to {typ.__name__}') attrs[k] = v @@ -393,8 +394,6 @@ def prepare_jobs(self) : , *( k for k in self.rule_rep.matches.keys() if k.isidentifier() ) } # - if self.attrs.is_python : self.rule_rep.interpreter = self.attrs.python - else : self.rule_rep.interpreter = self.attrs.shell for attr in ('cwd','ete','force','max_submit_count','n_tokens') : if attr in self.attrs : self.rule_rep[attr] = self.attrs[attr] @@ -437,31 +436,33 @@ def handle_submit_none(self) : self.rule_rep.submit_none_attrs = self._finalize() def handle_start_cmd(self) : + if self.attrs.is_python : interpreter = 'python' + else : interpreter = 'shell' self._init() - self._handle_val('auto_mkdir' ) - self._handle_val('env' ,'environ_cmd') - self._handle_val('ignore_stat' ) - self._handle_val('chroot' ) - self._handle_val('interpreter' ) - self._handle_val('tmp' ) - self._handle_val('use_script' ) + self._handle_val('auto_mkdir' ) + self._handle_val('env' ,rep_key='environ_cmd') + self._handle_val('ignore_stat' ) + self._handle_val('chroot' ) + self._handle_val('interpreter',rep_key=interpreter ) + self._handle_val('tmp' ) + self._handle_val('use_script' ) self.rule_rep.start_cmd_attrs = self._finalize() def handle_start_rsrcs(self) : self._init() - self._handle_val('autodep' ) - self._handle_val('env' ,'environ_resources') - self._handle_val('timeout' ) + self._handle_val('autodep' ) + self._handle_val('env' ,rep_key='environ_resources') + self._handle_val('timeout' ) self.rule_rep.start_rsrcs_attrs = self._finalize() def handle_start_none(self) : if not callable(self.attrs.kill_sigs) : self.attrs.kill_sigs = [int(x) for x in self.attrs.kill_sigs] self._init() - self._handle_val('keep_tmp' ) - self._handle_val('start_delay' ) - self._handle_val('kill_sigs' ) - self._handle_val('n_retries' ) - self._handle_val('env' ,'environ_ancillary') + self._handle_val('keep_tmp' ) + self._handle_val('start_delay' ) + self._handle_val('kill_sigs' ) + self._handle_val('n_retries' ) + self._handle_val('env' ,rep_key='environ_ancillary') self.rule_rep.start_none_attrs = self._finalize() def handle_end_cmd(self) : diff --git a/doc/lmake.texi b/doc/lmake.texi index b47cb688..305222a8 100644 --- a/doc/lmake.texi +++ b/doc/lmake.texi @@ -2131,7 +2131,7 @@ This is typically used to access some environment variables set by @code{slurm}. @item Default @tab system Python @item Dynamic -@tab False +@tab Yes. Environment includes stems, targets, deps and resources. @end multitable This attribute defines the interpreter used to run the @code{cmd} if it is a @code{function}. @@ -2151,7 +2151,7 @@ In particular, Python2.7 and all revisions of Python3 are fully supported. @item Default @tab @code{/bin/bash} @item Dynamic -@tab False +@tab Yes. Environment includes stems, targets, deps and resources. @end multitable This attribute defines the interpreter used to run the @code{cmd} if it is a @code{str}. diff --git a/src/autodep/clmake.cc b/src/autodep/clmake.cc index 9ab1432b..6be07c05 100644 --- a/src/autodep/clmake.cc +++ b/src/autodep/clmake.cc @@ -247,7 +247,7 @@ static PyObject* get_autodep( PyObject* /*null*/ , PyObject* args , PyObject* kw if (n_args>0) return py_err_set(Exception::TypeErr,"expected no args" ) ; char c = 0/*garbage*/ ; // we have a private Record with a private AutodepEnv, so we must go through the backdoor to alter the regular AutodepEnv - int rc [[maybe_unused]] = ::readlink( (PrivateAdminDir+"/backdoor/autodep"s ).c_str() , &c , 1 ) ; + int rc [[maybe_unused]] = ::readlinkat( Record::s_root_fd() , (PrivateAdminDir+"/backdoor/autodep"s ).c_str() , &c , 1 ) ; SWEAR( c=='0' || c=='1' , int(c) ) ; SWEAR( rc==1 , rc ) ; return Ptr(c!='0')->to_py_boost() ; @@ -262,8 +262,8 @@ static PyObject* set_autodep( PyObject* /*null*/ , PyObject* args , PyObject* kw char c ; // we have a private Record with a private AutodepEnv, so we must go through the backdoor to alter the regular AutodepEnv int rc [[maybe_unused]] ; // avoid compiler warning - if (+py_args[0]) rc = ::readlink( (PrivateAdminDir+"/backdoor/enable"s ).c_str() , &c , 1 ) ; - else rc = ::readlink( (PrivateAdminDir+"/backdoor/disable"s).c_str() , &c , 1 ) ; // note that the depend and target functions are still working while disabled + if (+py_args[0]) rc = ::readlinkat( Record::s_root_fd() , (PrivateAdminDir+"/backdoor/enable"s ).c_str() , &c , 1 ) ; + else rc = ::readlinkat( Record::s_root_fd() , (PrivateAdminDir+"/backdoor/disable"s).c_str() , &c , 1 ) ; // note that the depend and target functions are still working while disabled return None.to_py_boost() ; } diff --git a/src/autodep/gather.cc b/src/autodep/gather.cc index c1d6a4f2..970e73c7 100644 --- a/src/autodep/gather.cc +++ b/src/autodep/gather.cc @@ -117,7 +117,7 @@ void _child_wait_thread_func( int* wstatus , pid_t pid , Fd fd ) { } bool/*done*/ Gather::kill(int sig) { - Trace trace("kill",sig,pid,STR(as_session),child_stdout,child_stderr,mk_key_uset(slaves)) ; + Trace trace("kill",sig,pid,STR(as_session),child_stdout,child_stderr) ; Lock lock { _pid_mutex } ; int kill_sig = sig>=0 ? sig : SIGKILL ; bool killed_ = false ; @@ -129,10 +129,9 @@ bool/*done*/ Gather::kill(int sig) { pid_t ctl_pid = as_session ? ::getpgrp() : ::getpid() ; ::umap_s< GatherKind> fd_strs ; ::umap to_kill ; - trace("ctl",ctl_pid) ; - if (+child_stdout) fd_strs[ read_lnk(to_string("/proc/self/fd/",child_stdout.fd)) ] = GatherKind::Stdout ; - if (+child_stderr) fd_strs[ read_lnk(to_string("/proc/self/fd/",child_stderr.fd)) ] = GatherKind::Stderr ; - for( auto const& [fd,_] : slaves ) fd_strs[ read_lnk(to_string("/proc/self/fd/",fd .fd)) ] = GatherKind::Slave ; + trace("ctl",ctl_pid,mk_key_uset(slaves)) ; + if (+child_stdout) fd_strs[ read_lnk(to_string("/proc/self/fd/",child_stdout.fd)) ] = GatherKind::Stdout ; + if (+child_stderr) fd_strs[ read_lnk(to_string("/proc/self/fd/",child_stderr.fd)) ] = GatherKind::Stderr ; trace("fds",fd_strs) ; for( ::string const& proc_entry : lst_dir("/proc") ) { for( char c : proc_entry ) if (c>'9'||c<'0') goto NextProc ; @@ -267,9 +266,11 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd ::jthread wait_jt { _child_wait_thread_func , &wstatus , child.pid , child_fd } ; // thread dedicated to wating child Epoll epoll { New } ; Status status = Status::New ; - PD end ; + uint8_t n_active = 0 ; ::umap server_replies ; ::umap delayed_check_deps ; // check_deps events are delayed to ensure all previous deps are taken into account + Pdate job_end ; + Pdate reporting_end ; // auto handle_req_to_server = [&]( Fd fd , Jerr&& jerr ) -> bool/*still_sync*/ { trace("slave",fd,jerr) ; @@ -301,37 +302,46 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd return false ; } ; auto set_status = [&]( Status status_ , ::string const& msg_={} )->void { - if ( status==Status::New || status==Status::Ok ) status = status_ ; // else there is already another reason + if ( status==Status::New || status==Status::Ok ) status = status_ ; // else there is already another reason if ( +msg_ ) append_line_to_string(msg,msg_) ; } ; + auto dec_active = [&]()->void { + SWEAR(n_active) ; + n_active-- ; + if ( !n_active && +network_delay ) reporting_end = Pdate(New)+network_delay ; // once job is dead, wait at most network_delay (if set) for reporting to calm down + } ; // SWEAR(!slaves) ; // if (+timeout) { - end = PD(New) + timeout ; - trace("set_timeout",timeout,end) ; + job_end = Pdate(New) + timeout ; + trace("set_timeout",timeout,job_end) ; } - if (cstdout==Child::Pipe) { epoll.add_read(child_stdout=child.stdout,Kind::Stdout ) ; trace("read_stdout",child_stdout) ; } - if (cstderr==Child::Pipe) { epoll.add_read(child_stderr=child.stderr,Kind::Stderr ) ; trace("read_stderr",child_stderr) ; } - /**/ { epoll.add_read(child_fd ,Kind::ChildEnd) ; trace("read_child ",child_fd ) ; } - /**/ { epoll.add_read(master_fd ,Kind::Master ) ; trace("read_master",master_fd ) ; } + if (cstdout==Child::Pipe) { epoll.add_read(child_stdout=child.stdout,Kind::Stdout ) ; n_active++ ; trace("read_stdout",child_stdout) ; } + if (cstderr==Child::Pipe) { epoll.add_read(child_stderr=child.stderr,Kind::Stderr ) ; n_active++ ; trace("read_stderr",child_stderr) ; } + /**/ { epoll.add_read(child_fd ,Kind::ChildEnd) ; n_active++ ; trace("read_child ",child_fd ) ; } + /**/ { epoll.add_read(master_fd ,Kind::Master ) ; trace("read_master",master_fd ) ; } while (epoll.cnt) { uint64_t wait_ns = Epoll::Forever ; - if (+end) { - PD now = New ; - if (now>=end) { + if (+reporting_end) { + Pdate now = {New} ; + if (now events = epoll.wait(wait_ns) ; - if ( !events && +delayed_check_deps ) { // process delayed check deps after all other events + if ( !events && +delayed_check_deps ) { // process delayed check deps after all other events trace("delayed_chk_deps") ; for( auto& [fd,jerr] : delayed_check_deps ) handle_req_to_server(fd,::move(jerr)) ; delayed_check_deps.clear() ; @@ -351,23 +361,25 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd if (kind==Kind::Stderr) stderr.append(buf_view) ; else { stdout.append(buf_view) ; live_out_cb(buf_view) ; } } else { - epoll.del(fd) ; // /!\ dont close as fd is closed upon child destruction + epoll.del(fd) ; // /!\ dont close as fd is closed upon child destruction trace("close",kind,fd) ; - if (kind==Kind::Stderr) child_stderr = {} ; // tell kill not to wait for this one + if (kind==Kind::Stderr) child_stderr = {} ; // tell kill not to wait for this one else child_stdout = {} ; + dec_active() ; } } break ; case Kind::ChildEnd : { uint64_t one = 0/*garbage*/ ; int cnt = ::read( fd , &one , 8 ) ; SWEAR( cnt==8 && one==1 , cnt , one ) ; { Lock lock{_pid_mutex} ; - child.pid = -1 ; // too late to kill job + child.pid = -1 ; // too late to kill job } if (WIFEXITED (wstatus)) set_status( WEXITSTATUS(wstatus)!=0 ? Status::Err : Status::Ok ) ; - else if (WIFSIGNALED(wstatus)) set_status( is_sig_sync(WTERMSIG(wstatus)) ? Status::Err : Status::LateLost ) ; // synchronous signals are actually errors + else if (WIFSIGNALED(wstatus)) set_status( is_sig_sync(WTERMSIG(wstatus)) ? Status::Err : Status::LateLost ) ; // synchronous signals are actually errors else fail("unexpected wstatus : ",wstatus) ; epoll.close(fd) ; - epoll.cnt-- ; // do not wait for new connections, but if one arrives before all flows are closed, process it + epoll.cnt-- ; // do not wait for new connections, but if one arrives before all flows are closed, process it, so wait Master no more + dec_active() ; trace("close",kind,status,::hex,wstatus,::dec) ; } break ; case Kind::Master : { @@ -375,7 +387,7 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd Fd slave = master_fd.accept() ; epoll.add_read(slave,Kind::Slave) ; trace("read_slave",slave) ; - slaves[slave] ; // allocate entry + slaves[slave] ; // allocate entry } break ; case Kind::ServerReply : { JobRpcReply jrr ; diff --git a/src/autodep/gather.hh b/src/autodep/gather.hh index d5f15267..9f3ce4c5 100644 --- a/src/autodep/gather.hh +++ b/src/autodep/gather.hh @@ -48,17 +48,17 @@ struct Gather { // data // seen detection : we record the earliest date at which file has been as existing to detect situations where file is non-existing, then existing, then non-existing // this cannot be seen on file date has there is no date for non-existing files - PD read[N] { PD::Future , PD::Future , PD::Future } ; static_assert((N)==3) ; // first access (or ignore) date for each access - PD write = PD::Future ; // first write (or ignore) date - PD target = PD::Future ; // first date at which file was known to be a target - PD seen = PD::Future ; // first date at which file has been seen existing - CD crc_date ; // state when first read + PD read[N] { PD::Future , PD::Future , PD::Future } ; static_assert((N)==3) ; // first access (or ignore) date for each access + PD write = PD::Future ; // first write (or ignore) date + PD target = PD::Future ; // first date at which file was known to be a target + PD seen = PD::Future ; // first date at which file has been seen existing + CD crc_date ; // state when first read NodeIdx parallel_id = 0 ; AccessDigest digest ; } ; struct ServerReply { - IMsgBuf buf ; // buf to assemble the reply - Fd fd ; // fd to forward reply to + IMsgBuf buf ; // buf to assemble the reply + Fd fd ; // fd to forward reply to ::string codec_file ; } ; // cxtors & casts @@ -78,7 +78,7 @@ private : bool parallel = false ; for( auto& [f,dd] : jerr.files ) { _new_access( fd , jerr.date , ::move(f) , jerr.digest , dd , parallel , jerr.txt ) ; parallel = true ; } } - void _new_guards( Fd fd , Jerr&& jerr ) { // fd for trace purpose only + void _new_guards( Fd fd , Jerr&& jerr ) { // fd for trace purpose only Trace trace("_new_guards",fd,jerr.txt) ; for( auto& [f,_] : jerr.files ) { trace(f) ; guards.insert(::move(f)) ; } } @@ -95,42 +95,43 @@ public : //! void new_exec( PD , ::string const& exe , ::string const& ="s_exec" ) ; // void sync( Fd sock , JobExecRpcReply const& jerr ) { - try { OMsgBuf().send(sock,jerr) ; } catch (::string const&) {} // dont care if we cannot report the reply to job + try { OMsgBuf().send(sock,jerr) ; } catch (::string const&) {} // dont care if we cannot report the reply to job } // Status exec_child( ::vector_s const& args , Fd child_stdin=Fd::Stdin , Fd child_stdout=Fd::Stdout , Fd child_stderr=Fd::Stderr ) ; // - bool/*done*/ kill(int sig=-1) ; // is sig==-1, use best effort to kill job + bool/*done*/ kill(int sig=-1) ; // is sig==-1, use best effort to kill job // - void reorder(bool at_end) ; // reorder accesses by first read access and suppress superfluous accesses + void reorder(bool at_end) ; // reorder accesses by first read access and suppress superfluous accesses // data - ::function server_cb = [](Jerr &&)->Fd { return {} ; } ; // function to contact server when necessary, return error by default - ::function live_out_cb = [](::string_view const&)->void { } ; // function to report live output, dont report by default - ::function kill_job_cb = []( )->void { } ; // function to kill job - ServerSockFd master_fd ; - in_addr_t addr = NoSockAddr ; // local addr to which we can be contacted by running job - ::atomic as_session = false ; // if true <=> process is launched in its own group - AutodepMethod method = AutodepMethod::Dflt ; - AutodepEnv autodep_env ; - Time::Delay timeout ; - pid_t pid = -1 ; // pid to kill - bool killed = false ; // do not start as child is supposed to be already killed - ::vector kill_sigs ; // signals used to kill job - ::string chroot ; - ::string cwd ; - ::map_ss const* env = nullptr ; - vmap_s accesses ; - umap_s access_map ; - uset_s guards ; // dir creation/deletion that must be guarded against NFS - NodeIdx parallel_id = 0 ; // id to identify parallel deps - bool seen_tmp = false ; - int wstatus = 0/*garbage*/ ; - Fd child_stdout ; // fd used to gather stdout - Fd child_stderr ; // fd used to gather stderr - ::string stdout ; // contains child stdout if child_stdout==Pipe - ::string stderr ; // contains child stderr if child_stderr==Pipe - ::string msg ; // contains error messages not from job - ::umap>> slaves ; // Jerr's are waiting for confirmation + ::function server_cb = [](Jerr && )->Fd { return {} ; } ; // function to contact server when necessary, return error by default + ::function live_out_cb = [](::string_view const&)->void { } ; // function to report live output, dont report by default + ::function kill_job_cb = []( )->void { } ; // function to kill job + ServerSockFd master_fd ; + in_addr_t addr = NoSockAddr ; // local addr to which we can be contacted by running job + ::atomic as_session = false ; // if true <=> process is launched in its own group + AutodepMethod method = AutodepMethod::Dflt ; + AutodepEnv autodep_env ; + Time::Delay timeout ; + Time::Delay network_delay ; + pid_t pid = -1 ; // pid to kill + bool killed = false ; // do not start as child is supposed to be already killed + ::vector kill_sigs ; // signals used to kill job + ::string chroot ; + ::string cwd ; + ::map_ss const* env = nullptr ; + vmap_s accesses ; + umap_s access_map ; + uset_s guards ; // dir creation/deletion that must be guarded against NFS + NodeIdx parallel_id = 0 ; // id to identify parallel deps + bool seen_tmp = false ; + int wstatus = 0/*garbage*/ ; + Fd child_stdout ; // fd used to gather stdout + Fd child_stderr ; // fd used to gather stderr + ::string stdout ; // contains child stdout if child_stdout==Pipe + ::string stderr ; // contains child stderr if child_stderr==Pipe + ::string msg ; // contains error messages not from job + ::umap>> slaves ; // Jerr's are waiting for confirmation private : Mutex mutable _pid_mutex ; } ; diff --git a/src/autodep/ld_server.hh b/src/autodep/ld_server.hh index 71efc147..df8c9026 100644 --- a/src/autodep/ld_server.hh +++ b/src/autodep/ld_server.hh @@ -16,6 +16,7 @@ private : public : AutodepLock( ) = default ; AutodepLock(::vmap_s* deps=nullptr) : lock{_s_mutex} { + // SWEAR(cwd()==Record::s_autodep_env().root_dir) ; // too expensive SWEAR( !Record::s_deps && !Record::s_deps_err ) ; SWEAR( !*Record::s_access_cache ) ; Record::s_deps = deps ; @@ -27,6 +28,7 @@ public : Record::s_deps_err = nullptr ; t_active = false ; Record::s_access_cache->clear() ; + if (Record::s_seen_chdir) swear_prod(::fchdir(Record::s_root_fd())) ; // restore cwd in case it has been modified during user Python code execution } // data Lock> lock ; diff --git a/src/autodep/record.cc b/src/autodep/record.cc index 589da022..8cac55f0 100644 --- a/src/autodep/record.cc +++ b/src/autodep/record.cc @@ -26,6 +26,7 @@ bool Record::s_static_report = ::vmap_s * Record::s_deps = nullptr ; ::string * Record::s_deps_err = nullptr ; ::umap_s>* Record::s_access_cache = nullptr ; // map file to read accesses +bool Record::s_seen_chdir = false ; AutodepEnv* Record::_s_autodep_env = nullptr ; // declare as pointer to avoid late initialization Fd Record::_s_root_fd ; diff --git a/src/autodep/record.hh b/src/autodep/record.hh index f8ff8ee8..2e48dea2 100644 --- a/src/autodep/record.hh +++ b/src/autodep/record.hh @@ -54,6 +54,7 @@ public : static ::vmap_s * s_deps ; static ::string * s_deps_err ; static ::umap_s>* s_access_cache ; // map file to read accesses + static bool s_seen_chdir ; private : static AutodepEnv* _s_autodep_env ; static Fd _s_root_fd ; // a file descriptor to repo root dir @@ -334,10 +335,13 @@ public : int operator()( Record& r , int rc ) { r._report_confirm(file_loc,rc>=0) ; return rc ; } } ; // - void chdir(const char* dir) { _real_path.chdir(dir) ; } + void chdir(const char* dir) { + s_seen_chdir = true ; + _real_path.chdir(dir) ; + } // -private : // data +private : Disk::RealPath _real_path ; mutable AutoCloseFd _report_fd ; mutable bool _tmp_cache = false ; // record that tmp usage has been reported, no need to report any further diff --git a/src/client.cc b/src/client.cc index f986f304..fc094543 100644 --- a/src/client.cc +++ b/src/client.cc @@ -26,7 +26,7 @@ static bool _server_ok( Fd fd , ::string const& tag ) { return ok ; } -static pid_t _connect_to_server( bool refresh , bool sync ) { // if sync, ensure we launch our own server +static pid_t _connect_to_server( bool refresh , bool sync ) { // if sync, ensure we launch our own server Trace trace("_connect_to_server",STR(refresh)) ; ::string server_service ; bool server_is_local = false ; @@ -192,7 +192,7 @@ Bool3/*ok*/ _out_proc( ::vector_s* files , ReqProc proc , bool refresh , ReqSynt case 'r' : case 'R' : rv = Yes ; break ; case 'f' : - case 'F' : rv = Maybe ; break ; // force no color + case 'F' : rv = Maybe ; break ; // force no color default : rv = is_reverse_video(Fd::Stdin,Fd::Stdout) ; } trace("reverse_video",rv) ; @@ -208,8 +208,8 @@ Bool3/*ok*/ _out_proc( ::vector_s* files , ReqProc proc , bool refresh , ReqSynt ReqRpcReply report = IMsgBuf().receive(g_server_fds.in) ; switch (report.proc) { case Proc::None : trace("done" ) ; goto Return ; - case Proc::Status : trace("status",STR(report.ok)) ; rc = No|report.ok ; break ; - case Proc::File : trace("file" ,report.txt ) ; SWEAR(files) ; files->push_back(report.txt) ; break ; + case Proc::Status : trace("status",STR(report.ok)) ; rc = No|report.ok ; goto Return ; // XXX : why is it necessary goto Return here ? ... + case Proc::File : trace("file" ,report.txt ) ; SWEAR(files) ; files->push_back(report.txt) ; break ; // ... we should receive None when server closes stream case Proc::Txt : ::cout << report.txt << flush ; break ; DF} } @@ -217,6 +217,7 @@ Bool3/*ok*/ _out_proc( ::vector_s* files , ReqProc proc , bool refresh , ReqSynt trace("disconnected") ; } Return : + g_server_fds.out.close() ; // ensure server stops living because of us if (sync) waitpid( server_pid , nullptr , 0 ) ; return rc ; } diff --git a/src/fd.hh b/src/fd.hh index 1770229f..e5aa7178 100644 --- a/src/fd.hh +++ b/src/fd.hh @@ -19,7 +19,7 @@ struct Fd { static const Fd Stdin ; static const Fd Stdout ; static const Fd Stderr ; - static const Fd Std ; // the highest standard fd + static const Fd Std ; // the highest standard fd // cxtors & casts constexpr Fd( ) = default ; constexpr Fd( Fd const& fd_ ) { *this = fd_ ; } // XXX : use copy&swap idiom @@ -102,7 +102,7 @@ struct SockFd : AutoCloseFd { static constexpr in_addr_t LoopBackAddr = NoSockAddr ; // statics static ::string s_addr_str(in_addr_t addr) { - ::string res ; res.reserve(15) ; // 3 digits per level + 5 digits for the port + ::string res ; res.reserve(15) ; // 3 digits per level + 5 digits for the port /**/ res += to_string((addr>>24)&0xff) ; res += '.' ; res += to_string((addr>>16)&0xff) ; res += '.' ; res += to_string((addr>> 8)&0xff) ; @@ -144,7 +144,7 @@ public : } // services in_addr_t peer_addr() const { - static_assert(sizeof(in_addr_t)==4) ; // else use adequate ntohs/ntohl according to the size + static_assert(sizeof(in_addr_t)==4) ; // else use adequate ntohs/ntohl according to the size struct sockaddr_in peer_addr ; socklen_t len = sizeof(peer_addr) ; int rc = ::getpeername( fd , reinterpret_cast(&peer_addr) , &len ) ; @@ -228,27 +228,32 @@ struct Epoll { Epoll (NewType) { init () ; } ~Epoll( ) { close() ; } // services - void init() { fd = ::epoll_create1(EPOLL_CLOEXEC) ; fd.no_std() ; } - template void add( bool write , Fd fd_ , T data ) { + void init() { + fd = ::epoll_create1(EPOLL_CLOEXEC) ; + fd.no_std() ; + } + template void add( bool write , Fd fd_ , T data , bool wait=true ) { static_assert(sizeof(T)<=4) ; epoll_event event { .events=write?EPOLLOUT:EPOLLIN , .data={.u64=(uint64_t(uint32_t(data))<<32)|uint32_t(fd_) } } ; int rc = epoll_ctl( int(fd) , EPOLL_CTL_ADD , int(fd_) , &event ) ; swear_prod(rc==0,"cannot add ",fd_," to epoll ",fd," (",strerror(errno),')') ; - cnt++ ; + cnt += wait ; } - template void add_read ( Fd fd_ , T data ) { add(false/*write*/,fd_,data) ; } - template void add_write( Fd fd_ , T data ) { add(true /*write*/,fd_,data) ; } - void add ( bool write , Fd fd_ ) { add(write ,fd_,0) ; } - void add_read ( Fd fd_ ) { add(false/*write*/,fd_ ) ; } - void add_write( Fd fd_ ) { add(true /*write*/,fd_ ) ; } - void del(Fd fd_) { + void del( Fd fd_ , bool wait=true ) { // wait must be coherent with corresponding add int rc = ::epoll_ctl( fd , EPOLL_CTL_DEL , fd_ , nullptr ) ; swear_prod(rc==0,"cannot del",fd_,"from epoll",fd,'(',strerror(errno),')') ; - cnt-- ; + cnt -= wait ; } - void close(Fd fd_) { SWEAR(+fd_) ; del(fd_) ; fd_.close() ; } - void close( ) { fd .close() ; } ::vector wait(uint64_t timeout_ns=Forever) const ; + void close() { + fd .close() ; + } + /**/ void add ( bool write , Fd fd_ , bool wait=true ) { add(write,fd_,0 ,wait) ; } + template void add_read ( Fd fd_ , T data , bool wait=true ) { add(false,fd_,data,wait) ; } + template void add_write( Fd fd_ , T data , bool wait=true ) { add(true ,fd_,data,wait) ; } + /**/ void add_read ( Fd fd_ , bool wait=true ) { add(false,fd_, wait) ; } + /**/ void add_write( Fd fd_ , bool wait=true ) { add(true ,fd_, wait) ; } + /**/ void close ( Fd fd_ , bool wait=true ) { SWEAR(+fd_) ; del( fd_, wait) ; fd_.close() ; } // wait must be coherent with corresponding add // data Fd fd ; int cnt = 0 ; diff --git a/src/job_exec.cc b/src/job_exec.cc index f708822e..0407cfc4 100644 --- a/src/job_exec.cc +++ b/src/job_exec.cc @@ -369,14 +369,12 @@ int main( int argc , char* argv[] ) { if (found_server) exit(Rc::Fail ) ; // this is typically a ^C else exit(Rc::Fail,"cannot communicate with server ",g_service_start," : ",e) ; // this may be a server config problem, better to report } - trace("g_start_info" ,g_start_info ) ; - if (!g_start_info.proc) return 0 ; // silently exit if told to do so - g_nfs_guard.reliable_dirs = g_start_info.autodep_env.reliable_dirs ; - // + trace("g_start_info",g_start_info) ; switch (g_start_info.proc) { - case JobProc::None : return 0 ; // server ask us to give up - case JobProc::Start : break ; // normal case + case JobProc::None : return 0 ; // server ask us to give up + case JobProc::Start : break ; // normal case DF} + g_nfs_guard.reliable_dirs = g_start_info.autodep_env.reliable_dirs ; // for( auto const& [d ,digest] : g_start_info.deps ) if (digest.dflags[Dflag::Static]) g_match_dct.add( false/*star*/ , d , digest.dflags ) ; for( auto const& [dt,mf ] : g_start_info.static_matches ) g_match_dct.add( false/*star*/ , dt , mf ) ; @@ -386,18 +384,19 @@ int main( int argc , char* argv[] ) { try { cmd_env = prepare_env(end_report) ; } catch (::string const& e) { end_report.msg += e ; goto End ; } // - /**/ g_gather.addr = g_start_info.addr ; - /**/ g_gather.autodep_env = g_start_info.autodep_env ; - /**/ g_gather.chroot = g_start_info.chroot ; - /**/ g_gather.as_session = true ; - /**/ g_gather.cwd = g_start_info.cwd_s ; if (+g_gather.cwd) g_gather.cwd.pop_back() ; - /**/ g_gather.env = &cmd_env ; - /**/ g_gather.kill_sigs = g_start_info.kill_sigs ; - if (g_start_info.live_out) g_gather.live_out_cb = live_out_cb ; - /**/ g_gather.method = g_start_info.method ; - /**/ g_gather.server_cb = server_cb ; - /**/ g_gather.timeout = g_start_info.timeout ; - /**/ g_gather.kill_job_cb = kill_job ; + /**/ g_gather.addr = g_start_info.addr ; + /**/ g_gather.autodep_env = g_start_info.autodep_env ; + /**/ g_gather.chroot = g_start_info.chroot ; + /**/ g_gather.as_session = true ; + /**/ g_gather.cwd = g_start_info.cwd_s ; if (+g_gather.cwd) g_gather.cwd.pop_back() ; + /**/ g_gather.env = &cmd_env ; + /**/ g_gather.kill_sigs = g_start_info.kill_sigs ; + if (g_start_info.live_out) g_gather.live_out_cb = live_out_cb ; + /**/ g_gather.method = g_start_info.method ; + /**/ g_gather.server_cb = server_cb ; + /**/ g_gather.timeout = g_start_info.timeout ; + /**/ g_gather.network_delay = g_start_info.network_delay ; + /**/ g_gather.kill_job_cb = kill_job ; // trace("wash",g_start_info.pre_actions) ; ::pair_s wash_report = do_file_actions( g_washed , ::move(g_start_info.pre_actions) , g_nfs_guard , g_start_info.hash_algo ) ; @@ -417,12 +416,12 @@ int main( int argc , char* argv[] ) { child_stdout.no_std() ; } // - Pdate start_job = New ; // as late as possible before child starts + Pdate start_job = New ; // as late as possible before child starts // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv Status status = g_gather.exec_child( cmd_line() , child_stdin , child_stdout , Child::Pipe ) ; // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - Pdate end_job = New ; // as early as possible after child ends - bool killed = g_killed ; // sample g_killed to ensure coherence (even if status is correct, it may mean we were waiting for stdout/stderr) + Pdate end_job = New ; // as early as possible after child ends + bool killed = g_killed ; // sample g_killed to ensure coherence (even if status is correct, it may mean we were waiting for stdout/stderr) struct rusage rsrcs ; getrusage(RUSAGE_CHILDREN,&rsrcs) ; trace("start_job",start_job,"end_job",end_job) ; // @@ -430,14 +429,14 @@ int main( int argc , char* argv[] ) { // end_report.msg += compute_crcs(digest) ; // - if (!g_start_info.autodep_env.reliable_dirs) { // fast path : avoid listing targets & guards if reliable_dirs - for( auto const& [t,_] : digest.targets ) g_nfs_guard.change(t) ; // protect against NFS strange notion of coherence while computing crcs - for( auto const& f : g_gather.guards ) g_nfs_guard.change(f) ; // . + if (!g_start_info.autodep_env.reliable_dirs) { // fast path : avoid listing targets & guards if reliable_dirs + for( auto const& [t,_] : digest.targets ) g_nfs_guard.change(t) ; // protect against NFS strange notion of coherence while computing crcs + for( auto const& f : g_gather.guards ) g_nfs_guard.change(f) ; // . g_nfs_guard.close() ; } // if ( g_gather.seen_tmp && !g_start_info.keep_tmp ) - try { unlnk_inside(g_start_info.autodep_env.tmp_dir) ; } catch (::string const&) {} // cleaning is done at job start any way, so no harm + try { unlnk_inside(g_start_info.autodep_env.tmp_dir) ; } catch (::string const&) {} // cleaning is done at job start any way, so no harm // if ( killed ) { trace("killed" ) ; status = Status::Killed ; } else if ( status==Status::Ok && +digest.msg ) { trace("analysis_err") ; status = Status::Err ; } @@ -462,7 +461,7 @@ End : try { ClientSockFd fd { g_service_end , NConnectionTrials } ; Pdate end_overhead = New ; - end_report.digest.stats.total = end_overhead - start_overhead ; // measure overhead as late as possible + end_report.digest.stats.total = end_overhead - start_overhead ; // measure overhead as late as possible //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv OMsgBuf().send( fd , end_report ) ; //^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/lmakeserver/backend.cc b/src/lmakeserver/backend.cc index b3235325..7db1cf24 100644 --- a/src/lmakeserver/backend.cc +++ b/src/lmakeserver/backend.cc @@ -59,9 +59,9 @@ namespace Backends { Backend::DeferredThread* Backend::_s_deferred_report_thread = nullptr ; Backend::DeferredThread* Backend::_s_deferred_wakeup_thread = nullptr ; - static ::vmap_s _mk_digest_deps( ::vmap_s>>&& deps_attrs ) { + static ::vmap_s _mk_digest_deps( ::vmap_s&& deps_attrs ) { ::vmap_s res ; res.reserve(deps_attrs.size()) ; - for( auto& [_,ddfedf] : deps_attrs ) res.emplace_back( ::move(ddfedf.first) , DepDigest( {} , ddfedf.second.first , true/*parallel*/ ) ) ; + for( auto& [_,d] : deps_attrs ) res.emplace_back( ::move(d.txt) , DepDigest( {} , d.dflags , true/*parallel*/ ) ) ; return res ; } @@ -199,7 +199,7 @@ namespace Backends { ::pair,vector> pre_actions ; StartCmdAttrs start_cmd_attrs ; ::pair_ss/*script,call*/ cmd ; - ::vmap_s>> deps_attrs ; + ::vmap_s deps_attrs ; StartRsrcsAttrs start_rsrcs_attrs ; StartNoneAttrs start_none_attrs ; ::pair_ss start_msg_err ; @@ -248,7 +248,7 @@ namespace Backends { case 4 : append_line_to_string( start_msg_err.first , "cannot wash targets" ) ; break ; DF} } - trace("deps",deps) ; + trace("deps",step,deps) ; // record as much info as possible in reply switch (step) { case 5 : @@ -266,40 +266,40 @@ namespace Backends { [[fallthrough]] ; case 4 : case 3 : - /**/ reply.method = start_rsrcs_attrs.method ; - /**/ reply.timeout = start_rsrcs_attrs.timeout ; + /**/ reply.method = start_rsrcs_attrs.method ; + /**/ reply.timeout = start_rsrcs_attrs.timeout ; for( ::pair_ss& kv : start_rsrcs_attrs.env ) reply.env.push_back(::move(kv)) ; [[fallthrough]] ; case 2 : - /**/ reply.autodep_env.auto_mkdir = start_cmd_attrs.auto_mkdir ; - /**/ reply.autodep_env.ignore_stat = start_cmd_attrs.ignore_stat ; - /**/ reply.autodep_env.tmp_view = ::move(start_cmd_attrs.tmp ) ; // tmp directory as viewed by job - /**/ reply.chroot = ::move(start_cmd_attrs.chroot) ; - /**/ reply.use_script = start_cmd_attrs.use_script ; + /**/ reply.interpreter = start_cmd_attrs.interpreter ; + /**/ reply.autodep_env.auto_mkdir = start_cmd_attrs.auto_mkdir ; + /**/ reply.autodep_env.ignore_stat = start_cmd_attrs.ignore_stat ; + /**/ reply.autodep_env.tmp_view = ::move(start_cmd_attrs.tmp ) ; // tmp directory as viewed by job + /**/ reply.chroot = ::move(start_cmd_attrs.chroot) ; + /**/ reply.use_script = start_cmd_attrs.use_script ; for( ::pair_ss& kv : start_cmd_attrs.env ) reply.env.push_back(::move(kv)) ; [[fallthrough]] ; case 1 : - /**/ reply.cmd = ::move(cmd) ; + /**/ reply.cmd = ::move(cmd) ; [[fallthrough]] ; case 0 : { VarIdx ti = 0 ; for( ::string const& tn : match.static_matches() ) reply.static_matches.emplace_back( tn , rule->matches[ti++].second.flags ) ; for( ::string const& p : match.star_patterns () ) reply.star_matches .emplace_back( p , rule->matches[ti++].second.flags ) ; - if (rule->stdin_idx !=Rule::NoVar) reply.stdin = deps_attrs[rule->stdin_idx ].second.first ; - if (rule->stdout_idx!=Rule::NoVar) reply.stdout = reply.static_matches[rule->stdout_idx].first ; - /**/ reply.addr = fd.peer_addr() ; - /**/ reply.autodep_env.lnk_support = g_config.lnk_support ; - /**/ reply.autodep_env.reliable_dirs = g_config.reliable_dirs ; - /**/ reply.autodep_env.src_dirs_s = g_src_dirs_s ; - /**/ reply.autodep_env.root_dir = *g_root_dir ; - /**/ reply.cwd_s = rule->cwd_s ; - /**/ reply.hash_algo = g_config.hash_algo ; - /**/ reply.interpreter = rule->interpreter ; - /**/ reply.keep_tmp = keep_tmp ; - /**/ reply.kill_sigs = ::move(start_none_attrs.kill_sigs) ; - /**/ reply.live_out = submit_attrs.live_out ; - /**/ reply.network_delay = g_config.network_delay ; - /**/ reply.remote_admin_dir = g_config.remote_admin_dir ; + if (rule->stdin_idx !=Rule::NoVar) reply.stdin = deps_attrs [rule->stdin_idx ].second.txt ; + if (rule->stdout_idx!=Rule::NoVar) reply.stdout = reply.static_matches[rule->stdout_idx].first ; + /**/ reply.addr = fd.peer_addr() ; + /**/ reply.autodep_env.lnk_support = g_config.lnk_support ; + /**/ reply.autodep_env.reliable_dirs = g_config.reliable_dirs ; + /**/ reply.autodep_env.src_dirs_s = g_src_dirs_s ; + /**/ reply.autodep_env.root_dir = *g_root_dir ; + /**/ reply.cwd_s = rule->cwd_s ; + /**/ reply.hash_algo = g_config.hash_algo ; + /**/ reply.keep_tmp = keep_tmp ; + /**/ reply.kill_sigs = ::move(start_none_attrs.kill_sigs) ; + /**/ reply.live_out = submit_attrs.live_out ; + /**/ reply.network_delay = g_config.network_delay ; + /**/ reply.remote_admin_dir = g_config.remote_admin_dir ; for( ::pair_ss& kv : start_none_attrs .env ) reply.env.push_back(::move(kv)) ; } break ; DF} diff --git a/src/lmakeserver/cmd.cc b/src/lmakeserver/cmd.cc index 4795f891..2d6c1532 100644 --- a/src/lmakeserver/cmd.cc +++ b/src/lmakeserver/cmd.cc @@ -154,8 +154,8 @@ namespace Engine { size_t w = 0 ; ::umap_ss rev_map ; for( auto const& [k,d] : rule->deps_attrs.eval(job->simple_match()) ) { - w = ::max( w , k.size() ) ; - rev_map[d.first] = k ; + w = ::max( w , k.size() ) ; + rev_map[d.txt] = k ; } ::vector parallel ; for( Dep const& d : job->deps ) parallel.push_back(d.parallel) ; // first pass to count deps as they are compressed and size is not known upfront NodeIdx d = 0 ; diff --git a/src/lmakeserver/job.cc b/src/lmakeserver/job.cc index 8bb83c23..4098b33a 100644 --- a/src/lmakeserver/job.cc +++ b/src/lmakeserver/job.cc @@ -26,9 +26,9 @@ namespace Engine { for( Node d : to_mkdirs ) for( Node hd=d->dir() ; +hd ; hd = hd->dir() ) if (!to_mkdir_uphills.insert(hd).second) break ; - for( auto const& [_,d] : match.deps() ) // no need to mkdir a target dir if it is also a static dep dir (which necessarily already exists) - for( Node hd=Node(d.first)->dir() ; +hd ; hd = hd->dir() ) - if (!locked_dirs.insert(hd).second) break ; // if dir contains a dep, it cannot be rmdir'ed + for( auto const& [_,d] : match.deps() ) // no need to mkdir a target dir if it is also a static dep dir (which necessarily already exists) + for( Node hd=Node(d.txt)->dir() ; +hd ; hd = hd->dir() ) + if (!locked_dirs.insert(hd).second) break ; // if dir contains a dep, it cannot be rmdir'ed // // remove old targets for( Target t : targets ) { @@ -178,8 +178,8 @@ namespace Engine { Job::Job( Rule::SimpleMatch&& match , Req req , DepDepth lvl ) { Trace trace("Job",match,lvl) ; if (!match) { trace("no_match") ; return ; } - Rule rule = match.rule ; SWEAR( rule->special<=Special::HasJobs , rule->special ) ; - ::vmap_s>> dep_names ; + Rule rule = match.rule ; SWEAR( rule->special<=Special::HasJobs , rule->special ) ; + ::vmap_s dep_names ; try { dep_names = rule->deps_attrs.eval(match) ; } catch (::pair_ss const& msg_err) { @@ -192,17 +192,15 @@ namespace Engine { } ::vector deps ; deps.reserve(dep_names.size()) ; ::umap dis ; - for( auto const& [_,dndfedf] : dep_names ) { - auto const& [dn,dfedf] = dndfedf ; - auto [df,edf ] = dfedf ; - Node d { dn } ; - Accesses a = edf[ExtraDflag::Ignore] ? Accesses() : ~Accesses() ; + for( auto const& [_,dn] : dep_names ) { + Node d { dn.txt } ; + Accesses a = dn.extra_dflags[ExtraDflag::Ignore] ? Accesses() : ~Accesses() ; //vvvvvvvvvvvvvvvvvvv d->set_buildable(lvl) ; //^^^^^^^^^^^^^^^^^^^ if ( d->buildable<=Buildable::No ) { trace("no_dep",d) ; return ; } - if ( auto [it,ok] = dis.emplace(d,deps.size()) ; ok ) deps.emplace_back( d , a , df , true/*parallel*/ ) ; - else { deps[it->second].dflags |= df ; deps[it->second].accesses &= a ; } // uniquify deps by combining accesses and flags + if ( auto [it,ok] = dis.emplace(d,deps.size()) ; ok ) deps.emplace_back( d , a , dn.dflags , true/*parallel*/ ) ; + else { deps[it->second].dflags |= dn.dflags ; deps[it->second].accesses &= a ; } // uniquify deps by combining accesses and flags } //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv // args for store args for JobData @@ -508,23 +506,23 @@ namespace Engine { trace("req_before",target_reason,status,ri) ; req->missing_audits.erase(*this) ; // old missing audit is obsolete as soon as we have rerun the job // we call wakeup_watchers ourselves once reports are done to avoid anti-intuitive report order - // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv - JobReason err_reason = (*this)->make( ri , end_action , target_reason , Yes/*speculate*/ , &old_exec_time , false/*wakeup_watchers*/ ) ; - // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv + JobReason job_err_reason = (*this)->make( ri , end_action , target_reason , Yes/*speculate*/ , &old_exec_time , false/*wakeup_watchers*/ ) ; + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ bool full_report = ri.done() || !has_new_deps ; // if not done, does a full report anyway if this is not due to new deps ::string job_msg ; if (full_report) { - bool job_err = err_reason.tag>=JobReasonTag::Err ; + bool job_err = job_err_reason.tag>=JobReasonTag::Err ; job_msg = msg ; - if (!job_err) append_line_to_string( job_msg , local_msg ) ; // report local_msg if nothing more import to report - else { append_line_to_string( job_msg , reason_str(err_reason),'\n' ) ; err_reason |= err_reason ; stderr = {} ; } // dont report user stderr if analysis made it meaningless - /**/ append_line_to_string( job_msg , severe_msg ) ; + if (!job_err) append_line_to_string( job_msg , local_msg ) ; // report local_msg if no better message + else { append_line_to_string( job_msg , reason_str(job_err_reason),'\n' ) ; err_reason |= job_err_reason ; stderr = {} ; } // dont report user stderr if analysis made it ... + /**/ append_line_to_string( job_msg , severe_msg ) ; // ... meaningless } // Delay exec_time = digest.stats.total ; if ( status<=Status::Early ) SWEAR(!exec_time,exec_time) ; ::string pfx ; if ( !ri.done() && status>Status::Garbage && !unstable_dep ) pfx = "may_" ; // - JobReport jr = audit_end( pfx , ri , job_msg , full_report?stderr:""s , end_none_attrs.max_stderr_len , modified , exec_time ) ; // report rerun or resubmit rather than status + JobReport jr = audit_end( pfx , ri , job_msg , full_report?stderr:""s , end_none_attrs.max_stderr_len , modified , exec_time ) ; // report rerun or resubmit rather than status if (ri.done()) { trace("wakeup_watchers",ri) ; ri.wakeup_watchers() ; diff --git a/src/lmakeserver/main.cc b/src/lmakeserver/main.cc index 107adf01..7d7b6fdf 100644 --- a/src/lmakeserver/main.cc +++ b/src/lmakeserver/main.cc @@ -127,19 +127,20 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { t_thread_key = 'Q' ; Trace trace("reqs_thread_func",STR(_g_is_daemon)) ; // - ::stop_callback stop_cb { stop , [&](){ trace("stop") ; kill_self(SIGINT) ; } } ; // transform request_stop into an event we wait for + ::stop_callback stop_cb { stop , [&](){ trace("stop") ; kill_self(SIGINT) ; } } ; // transform request_stop into an event we wait for ::umap> in_tab ; Epoll epoll { New } ; // - epoll.add_read( _g_server_fd , EventKind::Master ) ; - epoll.add_read( _g_int_fd , EventKind::Int ) ; + epoll.add_read( _g_server_fd , EventKind::Master ) ; trace("read_master",_g_server_fd) ; + epoll.add_read( _g_int_fd , EventKind::Int ) ; trace("read_int" ,_g_int_fd ) ; // - if ( +_g_watch_fd && ::inotify_add_watch( _g_watch_fd , ServerMrkr , IN_DELETE_SELF | IN_MOVE_SELF | IN_MODIFY )>=0 ) - epoll.add_read( _g_watch_fd , EventKind::Watch ) ; // if server marker is touched by user, we do as we received a ^C + if ( +_g_watch_fd && ::inotify_add_watch( _g_watch_fd , ServerMrkr , IN_DELETE_SELF | IN_MOVE_SELF | IN_MODIFY )>=0 ) { + epoll.add_read( _g_watch_fd , EventKind::Watch ) ; trace("read_watch",_g_watch_fd) ; // if server marker is touched by user, we do as we received a ^C + } // if (!_g_is_daemon) { in_tab[in_fd] ; - epoll.add_read(in_fd,EventKind::Std) ; + epoll.add_read(in_fd,EventKind::Std) ; trace("read_std",in_fd) ; } // for(;;) { @@ -199,7 +200,7 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { case ReqProc::Forget : case ReqProc::Mark : case ReqProc::Show : - epoll.del(fd) ; // must precede close(fd) which may occur as soon as we push to g_engine_queue + epoll.del(fd) ; trace("stop_fd",rrr.proc,fd) ; // must precede close(fd) which may occur as soon as we push to g_engine_queue in_tab.erase(fd) ; //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv g_engine_queue.emplace( rrr.proc , fd , ofd , rrr.files , rrr.options ) ; @@ -207,7 +208,7 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { break ; case ReqProc::Kill : case ReqProc::None : { - epoll.del(fd) ; // must precede close(fd) which may occur as soon as we push to g_engine_queue + epoll.del(fd) ; trace("stop_fd",rrr.proc,fd) ; // must precede close(fd) which may occur as soon as we push to g_engine_queue auto it=in_tab.find(fd) ; Req r = it->second.second ; trace("eof",fd) ; @@ -226,9 +227,8 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { // if (new_fd) { Fd slave_fd = Fd(_g_server_fd.accept()) ; - trace("new_req",slave_fd) ; in_tab[slave_fd] ; // allocate entry - epoll.add_read(slave_fd,EventKind::Slave) ; + epoll.add_read(slave_fd,EventKind::Slave) ; trace("new_req",slave_fd) ; report_server(slave_fd,true/*running*/) ; } } @@ -321,7 +321,7 @@ bool/*interrupted*/ engine_loop() { break ; case ReqProc::Close : { auto it = fd_tab.find(req.req) ; - trace("close_req",req,STR(it->second.killed)) ; + trace("close_req",req,it->second.in,it->second.out,STR(it->second.killed)) ; //vvvvvvvvvvvvv req.req.close() ; //^^^^^^^^^^^^^ @@ -334,6 +334,9 @@ bool/*interrupted*/ engine_loop() { Req r = req.req ; auto it = fd_tab.find(r) ; bool req_active = it!=fd_tab.end() && it->second.out==req.out_fd ; // out_fd is held until now, and if it does not coincide with it->second, req id was reused for a new Req + // + if (it==fd_tab.end()) trace("kill_req",req ) ; + else trace("kill_req",req,it->second.in,it->second.out,STR(it->second.killed)) ; // vvvvvvvv if ( +r && +*r && req_active ) r.kill() ; // ^^^^^^^^ diff --git a/src/lmakeserver/req.cc b/src/lmakeserver/req.cc index b7c6e417..c15cfe46 100644 --- a/src/lmakeserver/req.cc +++ b/src/lmakeserver/req.cc @@ -84,7 +84,7 @@ namespace Engine { void Req::kill() { Trace trace("kill",*this) ; - SWEAR(zombie()) ; // zombie has already been set + SWEAR(zombie()) ; // zombie has already been set audit_ctrl_c( (*this)->audit_fd , (*this)->log_stream , (*this)->options ) ; Backend::s_kill_req(+*this) ; } @@ -536,17 +536,17 @@ namespace Engine { if ( !art && is_target(nfs_guard.access(name)) ) audit_node( Color::Note , "consider : git add" , node , lvl+1 ) ; // for( auto const& [rt,m] : mrts ) { // second pass to do report - JobTgt jt { rt , name } ; // do not pass *this as req to avoid generating error message at cxtor time - ::string reason ; - Node missing_dep ; - ::vmap_s>> static_deps ; + JobTgt jt { rt , name } ; // do not pass *this as req to avoid generating error message at cxtor time + ::string reason ; + Node missing_dep ; + ::vmap_s static_deps ; if ( +jt && jt->run_status!=RunStatus::MissingStatic ) { reason = "does not produce it" ; goto Report ; } try { static_deps = rt->deps_attrs.eval(m) ; } catch (::pair_ss const& msg_err) { reason = to_string("cannot compute its deps :\n",msg_err.first,msg_err.second) ; goto Report ; } { ::string missing_key ; for( bool search_non_buildable : {true,false} ) // first search a non-buildable, if not found, search for non makable as deps have been made - for( auto const& [k,ddfedf] : static_deps ) { - Node d{ddfedf.first} ; + for( auto const& [k,dn] : static_deps ) { + Node d{dn.txt} ; if ( search_non_buildable ? d->buildable>Buildable::No : d->status()<=NodeStatus::Makable ) continue ; missing_key = k ; missing_dep = d ; diff --git a/src/lmakeserver/rule.cc b/src/lmakeserver/rule.cc index c2fd5ebc..49346de1 100644 --- a/src/lmakeserver/rule.cc +++ b/src/lmakeserver/rule.cc @@ -260,9 +260,9 @@ namespace Engine { auto match = [&]()->Rule::SimpleMatch const& { { if (!m) m = Rule::SimpleMatch(j) ; } return m ; } ; // solve lazy evaluation auto stems = [&]()->::vector_s const& { return match().stems ; } ; // - auto matches = [&]()->::vector_s const& { { if (!mtab) for( ::string const& t : match().py_matches() ) mtab.push_back ( mk_lcl(t ,r->cwd_s)) ; } return mtab ; } ; - auto deps = [&]()->::vmap_ss const& { { if (!dtab) for( auto const& [k,dndf] : match().deps () ) dtab.emplace_back(k,mk_lcl(dndf.first,r->cwd_s)) ; } return dtab ; } ; - auto rsrcs = [&]()->::umap_ss const& { { if (!rtab) rtab = mk_umap(rsrcs_) ; } return rtab ; } ; + auto matches = [&]()->::vector_s const& { { if (!mtab) for( ::string const& t : match().py_matches() ) mtab.push_back ( mk_lcl(t ,r->cwd_s)) ; } return mtab ; } ; + auto deps = [&]()->::vmap_ss const& { { if (!dtab) for( auto const& [k,dn] : match().deps () ) dtab.emplace_back(k,mk_lcl(dn.txt,r->cwd_s)) ; } return dtab ; } ; + auto rsrcs = [&]()->::umap_ss const& { { if (!rtab) rtab = mk_umap(rsrcs_) ; } return rtab ; } ; for( auto [vc,i] : ctx ) { ::vmap_ss dct ; switch (vc) { @@ -424,14 +424,21 @@ namespace Engine { } deps.emplace_back( key , DepSpec{ ::move(parsed_dep) , df , edf } ) ; } - if (_qualify_dep( {} , rd.interpreter[0] , rd.is_python?DepKind::Python:DepKind::Shell )) - deps.emplace_back( "" , DepSpec{::copy(rd.interpreter[0]),Dflag::Static,{}} ) ; - if (deps.size()>=Rule::NoVar) throw to_string("too many static deps : ",deps.size()) ; + if (deps.size()>=Rule::NoVar-1) throw to_string("too many static deps : ",deps.size()) ; // -1 to leave some room to the interpreter, if any } - ::vmap_s>> DynamicDepsAttrs::eval( Rule::SimpleMatch const& match ) const { - ::vmap_s>> res ; - for( auto const& [k,ds] : spec.deps ) res.emplace_back( k , pair( parse_fstr(ds.pattern,match) , pair(ds.dflags,ds.extra_dflags) ) ) ; + void DepsAttrs::add_interpreter(RuleData const& rd) { + ::vector_s const& interpreter = rd.start_cmd_attrs.spec.interpreter ; + if ( +interpreter && _qualify_dep( {} , interpreter[0] , rd.is_python?DepKind::Python:DepKind::Shell ) ) { + ::string interpreter0 = interpreter[0] ; + rd.add_cwd(interpreter0) ; + deps.emplace_back( "" , DepSpec{::move(interpreter0),Dflags(Dflag::Static,Dflag::Required),{}} ) ; + } + } + + ::vmap_s DynamicDepsAttrs::eval(Rule::SimpleMatch const& match) const { + ::vmap_s res ; + for( auto const& [k,ds] : spec.deps ) res.emplace_back( k , DepSpec{parse_fstr(ds.txt,match),ds.dflags,ds.extra_dflags} ) ; // if (is_dynamic) { try { @@ -448,9 +455,9 @@ namespace Engine { ::string dep = _split_flags( "dep "+key , py_val , 1/*n_skip*/ , df , edf ) ; SWEAR(!(edf&~ExtraDflag::Top)) ; // or we must review side_deps match.rule->add_cwd( dep , edf[ExtraDflag::Top] ) ; _qualify_dep( key , dep , DepKind::Dep ) ; - ::pair_s> e { dep , {df,edf} } ; - if (spec.full_dynamic) { SWEAR(!dep_idxs.contains(key),key) ; res.emplace_back(key,e) ; } // dep cannot be both static and dynamic - else res[dep_idxs.at(key)].second = e ; // if not full_dynamic, all deps must be listed in spec + DepSpec ds { dep , df , edf } ; + if (spec.full_dynamic) { SWEAR(!dep_idxs.contains(key),key) ; res.emplace_back(key,ds) ; } // dep cannot be both static and dynamic + else res[dep_idxs.at(key)].second = ds ; // if not full_dynamic, all deps must be listed in spec } } catch (::string const& e) { throw ::pair_ss(e/*msg*/,{}/*err*/) ; } } @@ -479,6 +486,19 @@ namespace Engine { } } + // + // StartCmdAttrs + // + + StartCmdAttrs DynamicStartCmdAttrs::eval( Rule::SimpleMatch const& m , ::vmap_ss const& rsrcs , ::vmap_s* deps ) const { + StartCmdAttrs res = Base::eval(m,rsrcs,deps) ; + ::string interpreter0 = res.interpreter[0] ; + m.rule->add_cwd(interpreter0) ; + AutodepLock lock{deps} ; + AutoCloseFd(::open(interpreter0.c_str(),O_RDONLY)) ; // speed up dep acquisition by accessing interpreter, XXX : pretend access rather than make a real one for perf + return res ; + } + // // Cmd // @@ -853,8 +873,7 @@ namespace Engine { // field = "ete" ; if (dct.contains(field)) Attrs::acquire( exec_time , &dct[field] ) ; field = "force" ; if (dct.contains(field)) Attrs::acquire( force , &dct[field] ) ; - field = "interpreter" ; if (dct.contains(field)) Attrs::acquire( interpreter , &dct[field] ) ; if (!interpreter) throw "no interpreter found"s ; - field = "is_python" ; if (dct.contains(field)) Attrs::acquire( is_python , &dct[field] ) ; else throw "not found"s ; + field = "is_python" ; if (dct.contains(field)) Attrs::acquire( is_python , &dct[field] ) ; else throw "not found"s ; field = "max_submit_count" ; if (dct.contains(field)) Attrs::acquire( n_submits , &dct[field] , uint8_t(1) ) ; field = "n_tokens" ; if (dct.contains(field)) Attrs::acquire( n_tokens , &dct[field] ) ; // @@ -892,6 +911,7 @@ namespace Engine { stdin_idx = di ; break ; } + deps_attrs.spec.add_interpreter(*this) ; } catch(::string const& e) { throw to_string("while processing ",name,'.',field," :\n" ,indent(e) ) ; } } @@ -1086,21 +1106,21 @@ namespace Engine { /**/ return rd.job_name ; } - static ::string _pretty( size_t i , DepsAttrs const& ms , RuleData const& rd ) { + static ::string _pretty( size_t i , DepsAttrs const& da , RuleData const& rd ) { OStringStream res ; size_t wk = 0 ; size_t wd = 0 ; ::umap_ss patterns ; // - for( auto const& [k,ds] : ms.deps ) { - if (!ds.pattern) continue ; - ::string p = _pretty_fstr(ds.pattern,rd) ; + for( auto const& [k,ds] : da.deps ) { + if (!ds.txt) continue ; + ::string p = _pretty_fstr(ds.txt,rd) ; wk = ::max(wk,k.size()) ; wd = ::max(wd,p.size()) ; patterns[k] = ::move(p) ; } - for( auto const& [k,ds] : ms.deps ) { - if (!ds.pattern) continue ; + for( auto const& [k,ds] : da.deps ) { + if (!ds.txt) continue ; ::string flags ; bool first = true ; for( Dflag df : Dflag ::NRule ) if (ds.dflags [df ]) { flags += first?" : ":" , " ; first = false ; flags += snake(df ) ; } @@ -1153,19 +1173,21 @@ namespace Engine { int pass ; // auto do_field = [&](::string const& key , ::string const& val )->void { - if (pass==1) key_sz = ::max(key_sz,key.size()) ; // during 1st pass, compute max key size ; + if (pass==1) key_sz = ::max(key_sz,key.size()) ; // during 1st pass, compute max key size ; else res << indent( to_string(::setw(key_sz),key," : ",val,'\n') , i ) ; } ; - for( pass=1 ; pass<=2 ; pass++ ) { // on 1st pass we compute key size, on 2nd pass we do the job + ::string interpreter ; + bool first = true ; + for( ::string const& c : sca.interpreter ) { append_to_string( interpreter , first?"":" " , c ) ; first = false ; } + for( pass=1 ; pass<=2 ; pass++ ) { // on 1st pass we compute key size, on 2nd pass we do the job + if (+interpreter ) do_field( "interpreter" , interpreter ) ; if ( sca.auto_mkdir ) do_field( "auto_mkdir" , to_string(sca.auto_mkdir ) ) ; if ( sca.ignore_stat) do_field( "ignore_stat" , to_string(sca.ignore_stat) ) ; if (+sca.chroot ) do_field( "chroot" , sca.chroot ) ; if (+sca.tmp ) do_field( "tmp" , sca.tmp ) ; if ( sca.use_script ) do_field( "use_script" , to_string(sca.use_script ) ) ; } - if (+sca.env) { - res << indent("environ :\n",i) << _pretty_env( i+1 , sca.env ) ; - } + if (+sca.env) res << indent("environ :\n",i) << _pretty_env( i+1 , sca.env ) ; return res.str() ; } static ::string _pretty( size_t i , Cmd const& c , RuleData const& rd ) { @@ -1177,7 +1199,7 @@ namespace Engine { OStringStream res ; ::vmap_ss entries ; #pragma GCC diagnostic push - #pragma GCC diagnostic ignored "-Warray-bounds" // gcc -O3 complains about array bounds with a completely incoherent message (looks like a bug) + #pragma GCC diagnostic ignored "-Warray-bounds" // gcc -O3 complains about array bounds with a completely incoherent message (looks like a bug) /**/ entries.emplace_back( "autodep" , snake(sra.method) ) ; if (+sra.timeout) entries.emplace_back( "timeout" , sra.timeout.short_str() ) ; #pragma GCC diagnostic pop @@ -1236,12 +1258,9 @@ namespace Engine { /**/ entries.emplace_back( "job_name" , _pretty_job_name(*this) ) ; if (+cwd_s) entries.emplace_back( "cwd" , cwd_s.substr(0,cwd_s.size()-1) ) ; if (!is_special()) { - ::string i ; for( ::string const& c : interpreter ) append_to_string( i , +i?" ":"" , c ) ; - // if (force ) entries.emplace_back( "force" , to_string(force ) ) ; if (n_submits ) entries.emplace_back( "max_submit_count" , to_string(n_submits) ) ; if (n_tokens!=1) entries.emplace_back( "n_tokens" , to_string(n_tokens ) ) ; - /**/ entries.emplace_back( "interpreter" , i ) ; } res << _pretty_vmap(1,entries) ; if (+stems) res << indent("stems :\n",1) << _pretty_vmap ( 2,stems,true/*uniq*/) ; @@ -1288,7 +1307,7 @@ namespace Engine { bool special = is_special() ; Hash::Xxh h ; // each crc continues after the previous one, so they are standalone // - ::vector_s targets ; + ::vector_s targets ; for( auto const& [k,me] : matches ) if ( me.flags.is_target==Yes && me.flags.tflags()[Tflag::Target] ) targets.push_back(me.pattern) ; // keys and flags have no influence on matching @@ -1299,11 +1318,8 @@ namespace Engine { if (special) { h.update(allow_ext) ; // only exists for special rules } else { - bool with_interpreter = _qualify_dep({},interpreter[0],is_python?DepKind::Python:DepKind::Shell) ; - /**/ h.update(job_name ) ; // job_name has no effect for source & anti as it is only used to store jobs and there are none - if (with_interpreter) h.update(interpreter[0]) ; // no interpreter for source & anti - else h.update(""s ) ; // ensure type homogeneity - /**/ deps_attrs.update_hash(h) ; // no deps for source & anti + h.update(job_name) ; // job_name has no effect for source & anti as it is only used to store jobs and there are none + deps_attrs.update_hash(h) ; // no deps for source & anti } match_crc = h.digest() ; // @@ -1312,7 +1328,6 @@ namespace Engine { h.update(matches ) ; // these define names and influence cmd execution, all is not necessary but simpler to code h.update(force ) ; h.update(is_python ) ; - h.update(interpreter) ; cmd .update_hash(h) ; start_cmd_attrs.update_hash(h) ; end_cmd_attrs .update_hash(h) ; diff --git a/src/lmakeserver/rule.x.hh b/src/lmakeserver/rule.x.hh index eae18b52..216893a7 100644 --- a/src/lmakeserver/rule.x.hh +++ b/src/lmakeserver/rule.x.hh @@ -126,16 +126,18 @@ namespace Engine { ::string subst_fstr( ::string const& fstr , ::umap_s const& var_idxs , VarIdx& n_unnamed ) ; } ; + struct DepSpec { + ::string txt ; + Dflags dflags ; + ExtraDflags extra_dflags ; + } ; + // used at match time struct DepsAttrs { static constexpr const char* Msg = "deps" ; - struct DepSpec { - ::string pattern ; - Dflags dflags ; - ExtraDflags extra_dflags ; - } ; // services - void init( bool is_dynamic , Py::Dict const* , ::umap_s const& , RuleData const& ) ; + void init ( bool is_dynamic , Py::Dict const* , ::umap_s const& , RuleData const& ) ; + void add_interpreter( RuleData const& ) ; // data // START_OF_VERSIONING bool full_dynamic = true ; // if true <=> deps is empty and new keys can be added, else dynamic deps must be within dep keys ... @@ -218,6 +220,7 @@ namespace Engine { void init ( bool /*is_dynamic*/ , Py::Dict const* py_src , ::umap_s const& ) { update(*py_src) ; } void update( Py::Dict const& py_dct ) { using namespace Attrs ; + Attrs::acquire_from_dct( interpreter , py_dct , "interpreter" ) ; Attrs::acquire_from_dct( auto_mkdir , py_dct , "auto_mkdir" ) ; Attrs::acquire_from_dct( chroot , py_dct , "chroot" ) ; Attrs::acquire_env ( env , py_dct , "env" ) ; @@ -236,6 +239,7 @@ namespace Engine { } // data // START_OF_VERSIONING + ::vector_s interpreter ; bool auto_mkdir = false ; bool ignore_stat = false ; ::string chroot ; @@ -349,6 +353,7 @@ namespace Engine { // the part of the Dynamic struct which is stored on disk struct DynamicDskBase { + // statics static bool s_is_dynamic(Py::Tuple const& ) ; protected : static void _s_eval( Job , Rule::SimpleMatch&/*lazy*/ , ::vmap_ss const& rsrcs_ , ::vector const& ctx , EvalCtxFuncStr const& , EvalCtxFuncDct const& ) ; @@ -481,7 +486,19 @@ namespace Engine { DynamicDepsAttrs& operator=(DynamicDepsAttrs const& src) { Base::operator=( src ) ; return *this ; } // . DynamicDepsAttrs& operator=(DynamicDepsAttrs && src) { Base::operator=(::move(src)) ; return *this ; } // . // services - ::vmap_s>> eval( Rule::SimpleMatch const& ) const ; + ::vmap_s eval(Rule::SimpleMatch const&) const ; + } ; + + struct DynamicStartCmdAttrs : Dynamic { + using Base = Dynamic ; + // cxtors & casts + using Base::Base ; + DynamicStartCmdAttrs (DynamicStartCmdAttrs const& src) : Base { src } {} // only copy disk backed-up part, in particular mutex is not copied + DynamicStartCmdAttrs (DynamicStartCmdAttrs && src) : Base {::move(src)} {} // . + DynamicStartCmdAttrs& operator=(DynamicStartCmdAttrs const& src) { Base::operator=( src ) ; return *this ; } // . + DynamicStartCmdAttrs& operator=(DynamicStartCmdAttrs && src) { Base::operator=(::move(src)) ; return *this ; } // . + // services + StartCmdAttrs eval( Rule::SimpleMatch const& , ::vmap_ss const& rsrcs={} , ::vmap_s* deps=nullptr ) const ; } ; struct DynamicCmd : Dynamic { @@ -554,8 +571,8 @@ namespace Engine { ; } // services - void add_cwd( ::string& file , bool top ) const { - if (!( top || !file || !cwd_s )) file.insert(0,cwd_s) ; + void add_cwd( ::string& file , bool top=false ) const { + if (!( top || !cwd_s || !file || file[0]=='/' )) file.insert(0,cwd_s) ; } private : ::vector_s _list_ctx ( ::vector const& ctx ) const ; @@ -581,14 +598,13 @@ namespace Engine { Dynamic cache_none_attrs ; // in no crc, evaluated twice : at submit time to look for a hit and after execution to upload result Dynamic submit_rsrcs_attrs ; // in rsrcs crc, evaluated at submit time Dynamic submit_none_attrs ; // in no crc, evaluated at submit time - Dynamic start_cmd_attrs ; // in cmd crc, evaluated before execution + DynamicStartCmdAttrs start_cmd_attrs ; // in cmd crc, evaluated before execution DynamicCmd cmd ; // in cmd crc, evaluated before execution Dynamic start_rsrcs_attrs ; // in rsrcs crc, evaluated before execution Dynamic start_none_attrs ; // in no crc, evaluated before execution Dynamic end_cmd_attrs ; // in cmd crc, evaluated after execution Dynamic end_none_attrs ; // in no crc, evaluated after execution size_t n_tokens = 1 ; // in no crc, contains the number of tokens to determine parallelism to use for ETA computation - ::vector_s interpreter ; bool is_python = false ; bool force = false ; uint8_t n_submits = 0 ; // max number of submission for a given job for a given req (disabled if 0) @@ -635,7 +651,7 @@ namespace Engine { ::vector_s py_matches () const ; ::vector_s static_matches() const ; // - ::vmap_s>> const& deps() const { + ::vmap_s const& deps() const { if (!_has_deps) { _deps = rule->deps_attrs.eval(*this) ; _has_deps = true ; @@ -651,12 +667,12 @@ namespace Engine { ::vector_s stems ; // static stems only of course // cache private : - mutable bool _has_static_targets = false ; - mutable bool _has_star_targets = false ; - mutable bool _has_deps = false ; - mutable ::umap_s _static_targets ; - mutable ::vector _star_targets ; - mutable ::vmap_s>> _deps ; + mutable bool _has_static_targets = false ; + mutable bool _has_star_targets = false ; + mutable bool _has_deps = false ; + mutable ::umap_s _static_targets ; + mutable ::vector _star_targets ; + mutable ::vmap_s _deps ; } ; struct RuleTgt : Rule { @@ -911,7 +927,6 @@ namespace Engine { ::serdes(s,end_cmd_attrs ) ; ::serdes(s,end_none_attrs ) ; ::serdes(s,n_tokens ) ; - ::serdes(s,interpreter ) ; ::serdes(s,is_python ) ; ::serdes(s,force ) ; ::serdes(s,n_submits ) ; diff --git a/src/process.cc b/src/process.cc index a2c1b16b..fa6c7a3c 100644 --- a/src/process.cc +++ b/src/process.cc @@ -13,10 +13,10 @@ bool/*parent*/ Child::spawn( , ::string const& cwd_ , void (*pre_exec)() ) { - SWEAR( !stdin_fd || stdin_fd ==Fd::Stdin || stdin_fd >Fd::Std , stdin_fd ) ; // ensure reasonably simple situations - SWEAR( !stdout_fd || stdout_fd>=Fd::Stdout , stdout_fd ) ; // . - SWEAR( !stderr_fd || stderr_fd>=Fd::Stdout , stderr_fd ) ; // . - SWEAR(!( stderr_fd==Fd::Stdout && stdout_fd==Fd::Stderr ) ) ; // . + SWEAR( !stdin_fd || stdin_fd ==Fd::Stdin || stdin_fd >Fd::Std , stdin_fd ) ; // ensure reasonably simple situations + SWEAR( !stdout_fd || stdout_fd>=Fd::Stdout , stdout_fd ) ; // . + SWEAR( !stderr_fd || stderr_fd>=Fd::Stdout , stderr_fd ) ; // . + SWEAR(!( stderr_fd==Fd::Stdout && stdout_fd==Fd::Stderr ) ) ; // . ::Pipe p2c ; ::Pipe c2po ; ::Pipe c2pe ; @@ -25,27 +25,27 @@ bool/*parent*/ Child::spawn( if (stderr_fd==Pipe) c2pe.open() ; else if (+stderr_fd) c2pe.write = stderr_fd ; as_session = as_session_ ; pid = fork() ; - if (!pid) { // if in child + if (!pid) { // if in child if (as_session) ::setsid() ; sigset_t full_mask ; ::sigfillset(&full_mask) ; - ::sigprocmask(SIG_UNBLOCK,&full_mask,nullptr) ; // restore default behavior + ::sigprocmask(SIG_UNBLOCK,&full_mask,nullptr) ; // restore default behavior // - if (stdin_fd ==Pipe) { p2c .write.close() ; p2c .read .no_std() ; } // could be optimized, but too complex to manage - if (stdout_fd==Pipe) { c2po.read .close() ; c2po.write.no_std() ; } // . - if (stderr_fd==Pipe) { c2pe.read .close() ; c2pe.write.no_std() ; } // . + if (stdin_fd ==Pipe) { p2c .write.close() ; p2c .read .no_std() ; } // could be optimized, but too complex to manage + if (stdout_fd==Pipe) { c2po.read .close() ; c2po.write.no_std() ; } // . + if (stderr_fd==Pipe) { c2pe.read .close() ; c2pe.write.no_std() ; } // . // set up std fd if (stdin_fd ==None) ::close(Fd::Stdin ) ; else if (p2c .read !=Fd::Stdin ) ::dup2(p2c .read ,Fd::Stdin ) ; if (stdout_fd==None) ::close(Fd::Stdout) ; else if (c2po.write!=Fd::Stdout) ::dup2(c2po.write,Fd::Stdout) ; // save stdout in case it is modified and we want to redirect stderr to it if (stderr_fd==None) ::close(Fd::Stderr) ; else if (c2pe.write!=Fd::Stderr) ::dup2(c2pe.write,Fd::Stderr) ; // - if (p2c .read >Fd::Std) p2c .read .close() ; // clean up : we only want to set up standard fd, other ones are necessarily temporary constructions - if (c2po.write>Fd::Std) c2po.write.close() ; // . - if (c2pe.write>Fd::Std) c2pe.write.close() ; // . + if (p2c .read >Fd::Std) p2c .read .close() ; // clean up : we only want to set up standard fd, other ones are necessarily temporary constructions + if (c2po.write>Fd::Std) c2po.write.close() ; // . + if (c2pe.write>Fd::Std) c2pe.write.close() ; // . // const char** child_env = const_cast(environ) ; - ::vector_s env_vector ; // ensure actual env strings lifetime last until execve call + ::vector_s env_vector ; // ensure actual env strings lifetime last until execve call if (env) { - SWEAR(+args) ; // cannot fork with env + SWEAR(+args) ; // cannot fork with env size_t n_env = env->size() + (add_env?add_env->size():0) ; env_vector.reserve(n_env) ; for( auto const* e : {env,add_env} ) @@ -65,7 +65,7 @@ bool/*parent*/ Child::spawn( // if (!args) return false ; #if HAS_CLOSE_RANGE - //::close_range(3,~0u,CLOSE_RANGE_UNSHARE) ; // activate this code (uncomment) as an alternative to set CLOEXEC in IFStream/OFStream + //::close_range(3,~0u,CLOSE_RANGE_UNSHARE) ; // activate this code (uncomment) as an alternative to set CLOEXEC in IFStream/OFStream #endif const char** child_args = new const char*[args.size()+1] ; for( size_t i=0 ; i(child_args) , const_cast(child_env) ) ; else ::execv ( child_args[0] , const_cast(child_args) ) ; pid = -1 ; - throw to_string("cannot exec (",strerror(errno),") : ",args) ; // in case exec fails + exit(Rc::System,"cannot exec (",strerror(errno),") : ",args) ; // in case exec fails } if (stdin_fd ==Pipe) { stdin = p2c .write ; p2c .read .close() ; } if (stdout_fd==Pipe) { stdout = c2po.read ; c2po.write.close() ; } diff --git a/src/rpc_job.cc b/src/rpc_job.cc index fb919383..069e4247 100644 --- a/src/rpc_job.cc +++ b/src/rpc_job.cc @@ -282,7 +282,7 @@ JobInfo::JobInfo(::string const& filename) { } void JobInfo::write(::string const& filename) const { - OFStream os{filename} ; + OFStream os{dir_guard(filename)} ; serialize(os,start) ; serialize(os,end ) ; } diff --git a/unit_tests/cwd.py b/unit_tests/cwd.py index 4e7f83aa..bd5fc9fd 100644 --- a/unit_tests/cwd.py +++ b/unit_tests/cwd.py @@ -36,6 +36,7 @@ class CatPy(Cat,PyRule) : def cmd() : print(open(FIRST ).read(),end='') print(open(SECOND).read(),end='') + lmake.get_autodep() # check get_autodep works in a subdir else : diff --git a/unit_tests/dynamic.py b/unit_tests/dynamic.py index 3ffbea17..bbf7061c 100644 --- a/unit_tests/dynamic.py +++ b/unit_tests/dynamic.py @@ -15,6 +15,7 @@ import os from lmake.rules import Rule + from lmake.rules import python as system_python lmake.manifest = ( 'Lmakefile.py' @@ -22,6 +23,7 @@ , 'hello' , 'world' , 'deps.hello+world.ref' + , 'interpreter.hello+world.ref' , 'env.hello.ref' , *(f'autodep.{ad}.ref' for ad in autodeps) , 'resources.1.ref' @@ -63,6 +65,23 @@ def cmd() : print(open(deps['FIRST' ]).read(),end='') print(open(deps['SECOND']).read(),end='') + class Interpreter(Rule) : + stems = { + 'File1' : r'\w*' + , 'File2' : r'\w*' + } + target = 'interpreter.{File1}+{File2}' + deps = { + 'FIRST' : '{File1}' + , 'SECOND' : '{File2}' + } + def python() : + if step==1 : raise RuntimeError + return (system_python,) + def cmd() : + print(open(deps['FIRST' ]).read(),end='') + print(open(deps['SECOND']).read(),end='') + class Env(Rule) : target = r'env.{File:\w*}' environ_cmd = { 'VAR_CMD' : file_func } @@ -126,13 +145,14 @@ class Cmd(Rule) : import ut - print('hello' ,file=open('hello' ,'w')) - print('world' ,file=open('world' ,'w')) - print('hello\nworld' ,file=open('deps.hello+world.ref','w')) - print('hello\nhello\nhello',file=open('env.hello.ref' ,'w')) - print(1 ,file=open('resources.1.ref' ,'w')) - print(2 ,file=open('resources.2.ref' ,'w')) - print('hello' ,file=open('auto_mkdir.yes.ref' ,'w')) + print('hello' ,file=open('hello' ,'w')) + print('world' ,file=open('world' ,'w')) + print('hello\nworld' ,file=open('deps.hello+world.ref' ,'w')) + print('hello\nworld' ,file=open('interpreter.hello+world.ref','w')) + print('hello\nhello\nhello',file=open('env.hello.ref' ,'w')) + print(1 ,file=open('resources.1.ref' ,'w')) + print(2 ,file=open('resources.2.ref' ,'w')) + print('hello' ,file=open('auto_mkdir.yes.ref' ,'w')) # for ad in autodeps : print('hello',file=open(f'autodep.{ad}.ref','w')) open('auto_mkdir.no.ref','w') @@ -144,18 +164,19 @@ class Cmd(Rule) : print(f'step={s}',file=open('step.py','w')) rc = 1 if s==1 else 0 # - ut.lmake( 'deps.hello+world.ok' , done=2-rc*2 , steady=0 , failed=0 , new=2-rc*2 , no_deps =rc , rc=rc ) - ut.lmake( 'env.hello.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc*2 , resubmit=rc , rc=rc ) - ut.lmake( 'start_delay.no' , done=rc , steady=1-rc , failed=0 , new=0 , resubmit=rc , start=1 , rc=0 ) - ut.lmake( 'start_delay.yes' , done=rc , steady=1-rc , failed=0 , new=0 , resubmit=rc , start=rc , rc=0 ) - ut.lmake( 'resources.1.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , rc=rc ) - ut.lmake( 'resources.2.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , rc=rc ) - ut.lmake( 'max_stderr_len.1' , done=rc , steady=1-rc , failed=0 , new=0 , rc=0 ) - ut.lmake( 'max_stderr_len.2' , done=rc , steady=1-rc , failed=0 , new=0 , rc=0 ) - ut.lmake( 'allow_stderr.no' , done=0 , steady=0 , failed=1 , new=0 , rc=1 ) - ut.lmake( 'allow_stderr.yes' , done=0 , steady=1-rc , failed=rc , new=0 , rc=rc ) - ut.lmake( 'auto_mkdir.no.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , resubmit=rc , rc=rc ) - ut.lmake( 'auto_mkdir.yes.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , resubmit=rc , rc=rc ) - ut.lmake( 'cmd' , done=1-rc , steady=0 , failed=rc , new=0 , rc=rc ) + ut.lmake( 'deps.hello+world.ok' , done=2-rc*2 , steady=0 , failed=0 , new=1-rc , no_deps =rc , rc=rc ) + ut.lmake( 'interpreter.hello+world.ok' , done=2-rc*2 , steady=0 , failed=rc , new=3*rc , resubmit=rc , rc=rc ) # python accesses Lmakefile when it fails + ut.lmake( 'env.hello.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , resubmit=rc , rc=rc ) + ut.lmake( 'start_delay.no' , done=rc , steady=1-rc , failed=0 , new=0 , resubmit=rc , start=1 , rc=0 ) + ut.lmake( 'start_delay.yes' , done=rc , steady=1-rc , failed=0 , new=0 , resubmit=rc , start=rc , rc=0 ) + ut.lmake( 'resources.1.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , rc=rc ) + ut.lmake( 'resources.2.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , rc=rc ) + ut.lmake( 'max_stderr_len.1' , done=rc , steady=1-rc , failed=0 , new=0 , rc=0 ) + ut.lmake( 'max_stderr_len.2' , done=rc , steady=1-rc , failed=0 , new=0 , rc=0 ) + ut.lmake( 'allow_stderr.no' , done=0 , steady=0 , failed=1 , new=0 , rc=1 ) + ut.lmake( 'allow_stderr.yes' , done=0 , steady=1-rc , failed=rc , new=0 , rc=rc ) + ut.lmake( 'auto_mkdir.no.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , resubmit=rc , rc=rc ) + ut.lmake( 'auto_mkdir.yes.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , resubmit=rc , rc=rc ) + ut.lmake( 'cmd' , done=1-rc , steady=0 , failed=rc , new=0 , rc=rc ) # for ad in autodeps : ut.lmake( f'autodep.{ad}.ok' , done=2-rc*2 , steady=0 , failed=rc , new=rc , resubmit=rc , rc=rc ) diff --git a/unit_tests/wide.py b/unit_tests/wide.py index 8119f507..4282cd8b 100644 --- a/unit_tests/wide.py +++ b/unit_tests/wide.py @@ -29,4 +29,5 @@ def cmd() : n = 20 p = 40 d = ut.lmake( f'out_{p}_{n}' , may_rerun=... , rerun=... , was_done=... , done=... , steady=... ) - assert d['was_done']+d['done']+d['steady']==n*p+p+n+2,f'bad counts : {d}' + expected_done = n*p+p+n+2 + assert d['was_done']+d['done']+d['steady']==n*p+p+n+2,f'bad counts : {d} expected done : {expected_done}' diff --git a/unit_tests/wine.py b/unit_tests/wine.py index 26ed7116..ecf3845f 100644 --- a/unit_tests/wine.py +++ b/unit_tests/wine.py @@ -29,21 +29,21 @@ class Base(Rule) : class WineRule(Rule) : side_targets = { 'WINE' : ('.wine/{*:.*}','incremental') } environ_resources = { 'DISPLAY' : os.environ['DISPLAY'] } - timeout = 30 # actual time should be ~5s, but seems to block from time to time + timeout = 30 # actual time should be ~5s, but seems to block from time to time when host is loaded class WineInit(WineRule) : target = '.wine/init' - targets = { 'WINE' : '.wine/{*:.*}' } # for init wine env is not incremental + targets = { 'WINE' : '.wine/{*:.*}' } # for init wine env is not incremental side_targets = { 'WINE' : None } allow_stderr = True - cmd = 'wine64 cmd >$TMPDIR/out ; cat $TMPDIR/out' # do nothing, just to init support files (in targets), avoid waiting for stdout + cmd = 'wine64 cmd >$TMPDIR/out 2>$TMPDIR/err ; cat $TMPDIR/out ; cat $TMPDIR/err >&2' # do nothing, just to init support files (in targets), avoid waiting for stdout/stderr class Dut(Base,WineRule) : target = 'dut.{Method}' deps = { 'WINE_INIT' : '.wine/init' } autodep = '{Method}' - allow_stderr = True # in some systems, there are fixme messages - cmd = f'wine64 {hostname_exe} > $TMPDIR/out ; cat $TMPDIR/out' # avoid waiting for stdout + allow_stderr = True # in some systems, there are fixme messages + cmd = f'wine64 {hostname_exe} > $TMPDIR/out 2>$TMPDIR/err ; cat $TMPDIR/out ; cat $TMPDIR/err >&2' # avoid waiting for stdout/stderr class Chk(Base) : target = r'test.{Method}'