Skip to content

Commit

Permalink
fixed target clash fuzzy detection + implemented dep compression
Browse files Browse the repository at this point in the history
  • Loading branch information
cesar-douady committed Apr 2, 2024
1 parent 97ab7d8 commit bc9f384
Show file tree
Hide file tree
Showing 22 changed files with 643 additions and 470 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ $(LMAKE_ENV)/stamp : $(LMAKE_ALL_FILES) $(LMAKE_ENV)/Manifest $(patsubst %,$(LMA
@touch $@
@echo init $(LMAKE_ENV)-cache
$(LMAKE_ENV)/tok : $(LMAKE_ENV)/stamp $(LMAKE_ENV)/Lmakefile.py
@set -e ; cd $(LMAKE_ENV) ; export CC=$(CC) ; $(ROOT_DIR)/bin/lmake lmake.tar.gz -Vn & sleep 1 ; $(ROOT_DIR)/bin/lmake lmake.tar.gz >$(@F) ; wait $$! ; touch $(@F)
@set -e ; cd $(LMAKE_ENV) ; export CC=$(CC) ; $(ROOT_DIR)/bin/lmake lmake.tar.gz -Vn & sleep 1 ; $(ROOT_DIR)/bin/lmake lmake.tar.gz >$(@F) || rm -f $(@F) ; wait $$! || rm -f $(@F)

#
# archive
Expand Down
11 changes: 2 additions & 9 deletions TO_DO
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ items :

* improve lshow -i
- generate info on nodes
* use Pdate (end of job) rather than Ddate (target date) to detect clash
- Ddate are not reliable
- ok to detect manual, nothing else
* manage 32 bits executables
- compile ld_audit.so (and co) in both 32 & 64 bits
- put adequate $PLATFORM in LD_PRELOAD
* generate meaningful message in case of I/O error such as disk full
! before erasing a dir, check for phony targets, not only files on disk
- then dir markers for gcc include dirs could be phony, which is more elegant
? mimic slurm killing procedure
- use PR_SET_CHILD_SUBREAPER (cf man 2 prctl) to ensure we get all sub-processes
- follow process hierarchy
Expand Down Expand Up @@ -108,10 +106,9 @@ items :
- much like faketree : https://github.com/enfabrica/enkit/tree/master/faketree
- generalize tmp mapping
* implement cache v2 (copy & link) :
- put typeid(StartInfo,EndInfo,...) as version tag to ensure no inter-version clashes
- 2 levels : disk level, global level
- use link instead of copy
- warning : only when mtime is used instead of ctime
- for disk level
! support direct rebuild of deps
- specify a target flag 'direct' for use by pattern
- specify a dep flag 'direct' for dynamic use (through ldepend)
Expand Down Expand Up @@ -184,10 +181,6 @@ items :
- most importantly sufixes
? maybe a pre-pass searching for infix is advisable
* no req_info for non-buildable nodes
* make deps be a sequence of non-buildable nodes followed by a buildable node
- invent a new concept
- deps chain : Job -> NodeSeq -> Node -> Job
- store in a prefix file for best sharing and easy retieval
* there can be 1 backend thread per backend
- with one master socket
- and everything replicated per backend (including mutexes, tables, etc.)
Expand Down
4 changes: 2 additions & 2 deletions src/autodep/gather.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd
return false ;
} ;
auto set_status = [&]( Status status_ , ::string const& msg_={} )->void {
if (status==Status::New) status = status_ ; // else there is already another reason
if (+msg_ ) { set_nl(msg) ; msg += msg_ ; }
if ( status==Status::New || status==Status::Ok ) status = status_ ; // else there is already another reason
if ( +msg_ ) append_line_to_string(msg,msg_) ;
} ;
//
SWEAR(!slaves) ;
Expand Down
4 changes: 4 additions & 0 deletions src/autodep/record.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ Record::Mkdir::Mkdir( Record& r , Path&& path , ::string&& c ) : Solve{r,::move(
if (file_loc==FileLoc::Repo) r._report_guard( ::move(real) , ::move(c) ) ;
}

// note : in case the file is open WR_ONLY w/o O_TRUNC, it is true that the final content depends on the initial content.
// However :
// - if it is an official target, it is not a dep, whether you declare reading it or not
// - else, we do not compute a CRC on it and its actual content is not guaranteed. What is important in this case is that the execution of the job does not see the content.
static bool _do_stat (int flags) { return flags&O_PATH ; }
static bool _do_read (int flags) { return !_do_stat(flags) && (flags&O_ACCMODE)!=O_WRONLY && !(flags&O_TRUNC) ; }
static bool _do_write(int flags) { return !_do_stat(flags) && (flags&O_ACCMODE)!=O_RDONLY ; }
Expand Down
24 changes: 10 additions & 14 deletions src/ldump_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,15 @@ int main( int argc , char* argv[] ) {
if (argc!=2) exit(Rc::Usage,"usage : ldump_job file") ;
app_init() ;
//
IFStream job_stream{argv[1]} ;
try {
auto report_start = deserialize<JobInfoStart>(job_stream) ;
::cout << "eta : " << report_start.eta <<'\n' ;
::cout << "host : " << SockFd::s_host(report_start.host) <<'\n' ;
print_submit_attrs(report_start.submit_attrs) ;
::cout << "rsrcs :\n" ; _print_map(report_start.rsrcs) ;
print_pre_start (report_start.pre_start ) ;
print_start (report_start.start ) ;
} catch(...) {}
try {
auto report_end = deserialize<JobInfoEnd>(job_stream) ;
print_end(report_end.end) ;
} catch(...) {}
JobInfo job_info { argv[1] } ;
//
::cout << "eta : " << job_info.start.eta <<'\n' ;
::cout << "host : " << SockFd::s_host(job_info.start.host) <<'\n' ;
print_submit_attrs(job_info.start.submit_attrs) ;
::cout << "rsrcs :\n" ; _print_map(job_info.start.rsrcs) ;
print_pre_start (job_info.start.pre_start ) ;
print_start (job_info.start.start ) ;
//
print_end(job_info.end.end) ;
return 0 ;
}
41 changes: 19 additions & 22 deletions src/lmakeserver/backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,18 +336,19 @@ namespace Backends {
, .stderr = start_msg_err.second
} ;
trace("early",digest) ;
{ OFStream ofs { dir_guard(jaf) } ;
serialize( ofs , JobInfoStart({
JobInfo ji {
{
.eta = eta
, .submit_attrs = ::move(submit_attrs)
, .rsrcs = rsrcs
, .host = reply.addr
, .pre_start = jrr
, .start = ::move(reply)
, .stderr = start_msg_err.second
}) ) ;
serialize( ofs , JobInfoEnd{ JobRpcReq{JobProc::End,jrr.seq_id,jrr.job,JobDigest(digest)} } ) ;
}
}
, { { JobProc::End , jrr.seq_id , jrr.job , ::copy(digest) } }
} ;
ji.write(jaf) ;
job_exec = { job , reply.addr , file_date(jaf) , New } ; // job starts and ends
//vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
g_engine_queue.emplace( JobProc::Start , ::copy(job_exec) , false/*report_now*/ , ::move(pre_actions.second) , ""s , ::move(jrr.msg ) ) ;
Expand Down Expand Up @@ -462,7 +463,6 @@ namespace Backends {
dep.acquire_crc() ;
dd.crc_date(dep) ;
}
for( auto& [tn,td] : jrr.digest.targets ) if (td.extra_tflags[ExtraTflag::Wash]) td.date = je.start_date.d ; // adjust wash date as start_date was not available in job
::string jaf = job->ancillary_file() ;
serialize( OFStream(jaf,::ios::app) , JobInfoEnd{jrr} ) ; // /!\ _s_starting_job ensures ancillary file is written by _s_handle_job_start before we append to it
job->end_exec() ;
Expand Down Expand Up @@ -560,25 +560,22 @@ namespace Backends {
status = _s_release_start_entry(it,hbs) ;
trace("handle_job",job,entry,status) ;
}
{ ::string jaf = Job(job)->ancillary_file() ;
JobDigest jd { .status=status } ;
{ Job j { job } ;
JobDigest jd { .status=status } ;
if (status==Status::EarlyLostErr) { // if we do not retry, record run info
JobInfoStart jis {
.eta = eta
, .submit_attrs = submit_attrs
, .rsrcs = rsrcs
, .host = conn.host
, .pre_start { JobProc::None , conn.seq_id , job }
, .start { JobProc::None }
} ;
JobInfoEnd jie {
.end { JobProc::End , conn.seq_id , job , ::copy(jd) , ::copy(lost_report.first) }
JobInfo ji {
{ .eta = eta
, .submit_attrs = submit_attrs
, .rsrcs = rsrcs
, .host = conn.host
, .pre_start { JobProc::None , conn.seq_id , job }
, .start { JobProc::None }
}
, { .end { JobProc::End , conn.seq_id , job , ::copy(jd) , ::copy(lost_report.first) } }
} ;
OFStream os { dir_guard(jaf) } ;
serialize( os , jis ) ;
serialize( os , jie ) ;
j->write_job_info(ji) ;
}
JobExec je { job , file_date(jaf) , New } ; // job starts and ends, no host
JobExec je { j , file_date(j->ancillary_file()) , New } ; // job starts and ends, no host
//vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv
g_engine_queue.emplace( JobProc::Start , ::copy(je) , false/*report_now*/ ) ;
g_engine_queue.emplace( JobProc::End , ::move(je) , ::move(rsrcs) , ::move(jd) , ::move(lost_report.first) ) ;
Expand Down
70 changes: 32 additions & 38 deletions src/lmakeserver/caches/dir_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,28 +272,24 @@ namespace Caches {
::vector_s copied ;
Trace trace("DirCache::download",job,id,jn) ;
try {
JobInfoStart report_start ;
JobInfoEnd report_end ;
{ LockedFd lock { dfd , false/*exclusive*/ } ; // because we read the data , shared is ok
IFStream is { to_string(dir,'/',jn,"/data") } ;
deserialize(is,report_start) ;
deserialize(is,report_end ) ;
JobInfo job_info ;
{ LockedFd lock { dfd , false/*exclusive*/ } ; // because we read the data , shared is ok
job_info = { to_string(dir,'/',jn,"/data") } ;
// update some info
report_start.pre_start.job = +job ; // id is not stored in cache
report_start.submit_attrs.reason = reason ;
job_info.start.pre_start.job = +job ; // id is not stored in cache
job_info.start.submit_attrs.reason = reason ;
//
for( NodeIdx ti=0 ; ti<report_end.end.digest.targets.size() ; ti++ ) {
auto& entry = report_end.end.digest.targets[ti] ;
::string const& tn = entry.first ;
for( NodeIdx ti=0 ; ti<job_info.end.end.digest.targets.size() ; ti++ ) {
auto& entry = job_info.end.end.digest.targets[ti] ;
::string const& tn = entry.first ;
copied.push_back(tn) ;
nfs_guard.change(tn) ;
_copy( dfd , to_string(ti) , tn , true/*unlnk_dst*/ , false/*mk_read_only*/ ) ;
entry.second.date = file_date(tn) ; // target date is not stored in cache
}
job_info.end.end.digest.end_date = New ; // date must be after files are copied
copied.push_back(job->ancillary_file()) ;
OFStream os { dir_guard(copied.back()) } ;
serialize(os,report_start) ;
serialize(os,report_end ) ;
job_info.write(dir_guard(copied.back())) ;
}
// ensure we take a single lock at a time to avoid deadlocks
// upload is the only one to take several locks
Expand All @@ -302,7 +298,7 @@ namespace Caches {
_lru_first(jn,sz_) ;
trace("done",sz_) ;
}
return report_end.end.digest ;
return job_info.end.end.digest ;
} catch(::string const& e) {
for( ::string const& f : copied ) unlnk(f) ; // clean up partial job
trace("failed") ;
Expand All @@ -314,38 +310,34 @@ namespace Caches {
::string jn = _unique_name(job,repo) ;
Trace trace("DirCache::upload",job,jn) ;
//
JobInfoStart report_start ;
JobInfoEnd report_end ;
try {
IFStream is { job->ancillary_file() } ;
deserialize(is,report_start) ;
deserialize(is,report_end ) ;
} catch (::string const& e) {
trace("no_ancillary_file",e) ;
JobInfo job_info = job->job_info() ;
if (!job_info.end.end.proc) { // we need a full report to cache job
trace("no_ancillary_file") ;
return false/*ok*/ ;
}
// remove useless info
report_start.pre_start.seq_id = 0 ; // no seq_id since no execution
report_start.start .small_id = 0 ; // no small_id since no execution
report_start.pre_start.job = 0 ; // job_id may not be the same in the destination repo
report_start.eta = {} ; // dont care about timing info in cache
report_start.submit_attrs.reason = {} ; // cache does not care about original reason
report_start.rsrcs.clear() ; // caching resources is meaningless as they have no impact on content
for( auto& [tn,td] : report_end.end.digest.targets ) {
SWEAR(!td.polluted) ; // cannot be a candidate for upload as this must have failed
job_info.start.pre_start.seq_id = 0 ; // no seq_id since no execution
job_info.start.start .small_id = 0 ; // no small_id since no execution
job_info.start.pre_start.job = 0 ; // job_id may not be the same in the destination repo
job_info.start.eta = {} ; // dont care about timing info in cache
job_info.start.submit_attrs.reason = {} ; // cache does not care about original reason
job_info.start.rsrcs.clear() ; // caching resources is meaningless as they have no impact on content
for( auto& [tn,td] : job_info.end.end.digest.targets ) {
SWEAR(!td.polluted) ; // cannot be a candidate for upload as this must have failed
td.date.clear() ;
td.extra_tflags = {} ;
}
job_info.end.end.digest.end_date = {} ;
// check deps
for( auto const& [dn,dd] : report_end.end.digest.deps ) if (dd.is_date) return false/*ok*/ ;
for( auto const& [dn,dd] : job_info.end.end.digest.deps ) if (dd.is_date) return false/*ok*/ ;
//
mkdir(dir_fd,jn) ;
AutoCloseFd dfd = open_read(dir_fd,jn) ;
//
// upload is the only one to take several locks and it starts with the global lock
// this way, we are sure to avoid deadlocks
LockedFd lock2{ dir_fd , true/*exclusive*/ } ; // because we manipulate LRU and because we take several locks, need exclusive
LockedFd lock { dfd , true/*exclusive*/ } ; // because we write the data , need exclusive
LockedFd lock2{ dir_fd , true/*exclusive*/ } ; // because we manipulate LRU and because we take several locks, need exclusive
LockedFd lock { dfd , true/*exclusive*/ } ; // because we write the data , need exclusive
//
Sz old_sz = _lru_remove(jn) ;
Sz new_sz = 0 ;
Expand All @@ -356,8 +348,10 @@ namespace Caches {
// store meta-data
::string data_file = to_string(dir,'/',jn,"/data") ;
::string deps_file = to_string(dir,'/',jn,"/deps") ;
{ OFStream os { data_file } ; serialize(os,report_start) ; serialize(os,report_end) ; }
{ OFStream os { deps_file } ; serialize(os,report_end.end.digest.deps) ; } // store deps in a compact format so that matching is fast
//
job_info.write(data_file) ;
serialize(OFStream(deps_file),job_info.end.end.digest.deps) ; // store deps in a compact format so that matching is fast
//
/**/ new_sz += FileInfo(data_file ).sz ;
/**/ new_sz += FileInfo(deps_file ).sz ;
for( auto const& [tn,_] : digest.targets ) new_sz += FileInfo(nfs_guard.access(tn)).sz ;
Expand All @@ -367,8 +361,8 @@ namespace Caches {
_copy( digest.targets[ti].first , dfd , to_string(ti) , false/*unlnk_dst*/ , true/*mk_read_only*/ ) ;
} catch (::string const& e) {
trace("failed",e) ;
unlnk_inside(dfd) ; // clean up in case of partial execution
_mk_room( made_room?new_sz:old_sz , 0 ) ; // finally, we did not populate the entry
unlnk_inside(dfd) ; // clean up in case of partial execution
_mk_room( made_room?new_sz:old_sz , 0 ) ; // finally, we did not populate the entry
return false/*ok*/ ;
}
_lru_first(jn,new_sz) ;
Expand Down
Loading

0 comments on commit bc9f384

Please sign in to comment.