From 2722ac5f5b379e8818547faeef75ba017b3db069 Mon Sep 17 00:00:00 2001 From: Cesar Douady Date: Thu, 25 Apr 2024 14:09:42 +0200 Subject: [PATCH] improve lmake-lmakeserver connection loss management --- src/lmakeserver/backend.cc | 6 +++--- src/lmakeserver/main.cc | 7 ++++--- src/process.hh | 3 +-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lmakeserver/backend.cc b/src/lmakeserver/backend.cc index 7c8c7183..247dfab1 100644 --- a/src/lmakeserver/backend.cc +++ b/src/lmakeserver/backend.cc @@ -612,9 +612,9 @@ namespace Backends { void Backend::s_config( ::array> const& config , bool dynamic ) { static ::jthread heartbeat_thread { _s_heartbeat_thread_func } ; - static JobThread job_start_thread {'S',_s_handle_job_start ,1000/*backlog*/} ; _s_job_start_thread = &job_start_thread ; - static JobMngtThread job_mngt_thread {'M',_s_handle_job_mngt ,1000/*backlog*/} ; _s_job_mngt_thread = &job_mngt_thread ; - static JobThread job_end_thread {'E',_s_handle_job_end ,1000/*backlog*/} ; _s_job_end_thread = &job_end_thread ; + static JobThread job_start_thread {'S',_s_handle_job_start ,4096/*backlog*/} ; _s_job_start_thread = &job_start_thread ; // 4096 : max usual value as set in ... + static JobMngtThread job_mngt_thread {'M',_s_handle_job_mngt ,4096/*backlog*/} ; _s_job_mngt_thread = &job_mngt_thread ; // ... /proc/sys/net/core/somaxconn + static JobThread job_end_thread {'E',_s_handle_job_end ,4096/*backlog*/} ; _s_job_end_thread = &job_end_thread ; // . static DeferredThread deferred_report_thread{'R',_s_handle_deferred_report } ; _s_deferred_report_thread = &deferred_report_thread ; static DeferredThread deferred_wakeup_thread{'W',_s_handle_deferred_wakeup } ; _s_deferred_wakeup_thread = &deferred_wakeup_thread ; Trace trace(BeChnl,"s_config",STR(dynamic)) ; diff --git a/src/lmakeserver/main.cc b/src/lmakeserver/main.cc index 59342ed3..c030e02b 100644 --- a/src/lmakeserver/main.cc +++ b/src/lmakeserver/main.cc @@ -150,6 +150,7 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { for( Epoll::Event event : events ) { EventKind kind = event.data() ; Fd fd = event.fd() ; + trace("event",kind,fd) ; switch (kind) { // it may be that in a single poll, we get the end of a previous run and a request for a new one // problem lies in this sequence : @@ -167,7 +168,6 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { trace("stop_requested") ; goto Done ; } - trace("int",kind) ; switch (kind) { case EventKind::Int : { struct signalfd_siginfo event ; ssize_t cnt = ::read(_g_int_fd ,&event,sizeof(event)) ; SWEAR( cnt==sizeof(event) , cnt ) ; } break ; case EventKind::Watch : { struct inotify_event event ; ssize_t cnt = ::read(_g_watch_fd,&event,sizeof(event)) ; SWEAR( cnt==sizeof(event) , cnt ) ; } break ; @@ -186,7 +186,7 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { try { if (!in_tab.at(fd).first.receive_step(fd,rrr)) continue ; } catch (...) { rrr.proc = ReqProc::None ; } Fd ofd = kind==EventKind::Std ? out_fd : fd ; - trace("req",kind,fd,rrr) ; + trace("req",rrr) ; switch (rrr.proc) { case ReqProc::Make : { Req r{New} ; @@ -200,7 +200,7 @@ void reqs_thread_func( ::stop_token stop , Fd in_fd , Fd out_fd ) { case ReqProc::Forget : case ReqProc::Mark : case ReqProc::Show : - epoll.del(fd) ; trace("stop_fd",rrr.proc,fd) ; // must precede close(fd) which may occur as soon as we push to g_engine_queue + epoll.del(fd) ; trace("del_fd",rrr.proc,fd) ; // must precede close(fd) which may occur as soon as we push to g_engine_queue in_tab.erase(fd) ; //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv g_engine_queue.emplace( rrr.proc , fd , ofd , rrr.files , rrr.options ) ; @@ -424,6 +424,7 @@ int main( int argc , char** argv ) { else g_startup_dir_s = new ::string ; // _g_int_fd = open_sig_fd({SIGINT,SIGHUP}) ; // must be done before app_init so that all threads block the signal + set_sig({SIGPIPE},true/*block*/) ; // vvvvvvvvvvvvvvv Persistent::writable = true ; Codec ::writable = true ; diff --git a/src/process.hh b/src/process.hh index d1ebf58f..f1a2ca19 100644 --- a/src/process.hh +++ b/src/process.hh @@ -51,8 +51,7 @@ inline bool/*all_done*/ set_sig( ::vector const& sigs , bool block ) { inline Fd open_sig_fd(::vector const& sigs) { swear_prod(set_sig(sigs,true/*block*/),"some of signals",sigs,"are already blocked") ; // - sigset_t mask ; - ::sigemptyset(&mask) ; + sigset_t mask ; ::sigemptyset(&mask) ; for( int s : sigs) ::sigaddset(&mask,s) ; // return ::signalfd( -1 , &mask , SFD_CLOEXEC ) ;