From 2754a0e1fe42376ea9ca161da7eb110677cb16bd Mon Sep 17 00:00:00 2001 From: uralm1 Date: Thu, 17 Jun 2021 22:34:00 +0500 Subject: [PATCH] Use named locks to block concurrent jobs --- CHANGELOG.md | 2 ++ lib/Adup.pm | 6 +++++- lib/Adup/Task/Merge.pm | 25 ++++++++++++++++--------- lib/Adup/Task/Preprocess.pm | 31 +++++++++++++++++++------------ lib/Adup/Task/SmbLoad.pm | 6 ++++++ lib/Adup/Task/Sync.pm | 30 +++++++++++++++++++----------- lib/Adup/Task/Zupprocess.pm | 13 +++++++------ 7 files changed, 74 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b2587a..e908a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +- Взаимоблокировка заданий через named guard locks. +- Исправлен баг с тем, что запуск заданий не всегда сразу отображается в gui, прогрессбар отображается после перезагрузки страницы. ## [1.18] - 2021-06-17 - Синхронизация с 1С ЗУП. diff --git a/lib/Adup.pm b/lib/Adup.pm index 0e9fb91..33c6feb 100644 --- a/lib/Adup.pm +++ b/lib/Adup.pm @@ -6,9 +6,10 @@ use Adup::Command::sync; use Adup::Command::merge; use Adup::Command::resettasks; use Adup::Command::smbload; +use Adup::Command::zupload; use Adup::Command::cron; -our $VERSION = '1.18'; +our $VERSION = '1.19'; # This method will run once at server start sub startup { @@ -52,6 +53,9 @@ sub startup { # update database $self->migrate_database; + # Reset locks + $self->minion->on(worker => sub { shift->reset({locks=>1}) }); + # Router authentication routine $self->hook(before_dispatch => sub { my $c = shift; diff --git a/lib/Adup/Task/Merge.pm b/lib/Adup/Task/Merge.pm index 2a1e662..f16d952 100644 --- a/lib/Adup/Task/Merge.pm +++ b/lib/Adup/Task/Merge.pm @@ -38,25 +38,32 @@ sub register { # internal sub _merge { my ($job, $remote_user) = @_; + my $app = $job->app; - $job->app->log->info("Start merge $$: ".$job->id); - my $db_adup = $job->app->mysql_adup->db; + my $guard = $job->minion->guard('sync_job_guard', 3600); + unless ($guard) { + $app->log->error("Exited merge $$: ".$job->id.'. Other concurrent job is active.'); + return $job->finish('Other concurrent job is active'); + } + + $app->log->info("Start merge $$: ".$job->id); + my $db_adup = $app->mysql_adup->db; my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>90); - my $ldap = Net::LDAP->new($job->app->config->{ldap_servers}, port => 389, timeout => 10, version => 3); + my $ldap = Net::LDAP->new($app->config->{ldap_servers}, port => 389, timeout => 10, version => 3); unless ($ldap) { $log->l(state=>91, info=>"Произошла ошибка подключения к глобальному каталогу"); return $job->fail("LDAP creation error $@"); } - my $mesg = $ldap->bind($job->app->config->{ldap_user}, password => $job->app->config->{ldap_pass}); + my $mesg = $ldap->bind($app->config->{ldap_user}, password => $app->config->{ldap_pass}); if ($mesg->code) { $log->l(state=>91, info=>"Произошла ошибка авторизации при подключении к глобальному каталогу"); return $job->fail("LDAP bind error ".$mesg->error); } - $job->app->set_task_state($db_adup, $TASK_ID, $job->id); + $app->set_task_state($db_adup, $TASK_ID, $job->id); # merging changes for types ... in sequence # see type_robotic field in change objects @@ -100,7 +107,7 @@ sub _merge { ORDER BY $m_order_tmpl id ASC", $mt); }; unless (defined $e) { - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('Merge - database fatal error'); } my $changes_count = 0; @@ -111,7 +118,7 @@ sub _merge { while (my $next = $res->hash) { my $c = Adup::Ural::ChangeFactory->fromdb(id=>$next->{id}, json=>$next->{c}); if ($c->approved) { - if ($c->merge(author=>$remote_user, db=>$db_adup, ldap=>$ldap, config=>$job->app->config, log=>$log)) { + if ($c->merge(author=>$remote_user, db=>$db_adup, ldap=>$ldap, config=>$app->config, log=>$log)) { $changes_processed_count++; } else { $log->l(state=>91, info=>"Изменение-$seq_el->{desc} не применено. Возникла ошибка при применении изменения-$seq_el->{desc}: $next->{name}."); @@ -146,10 +153,10 @@ sub _merge { $log->l(info => 'Отчёт о применении изменений. '.$log_buf) if $log_buf; - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); $ldap->unbind; - $job->app->log->info("Finish $$: ".$job->id); + $app->log->info("Finish $$: ".$job->id); $job->finish; } diff --git a/lib/Adup/Task/Preprocess.pm b/lib/Adup/Task/Preprocess.pm index d93ff99..ab789f8 100644 --- a/lib/Adup/Task/Preprocess.pm +++ b/lib/Adup/Task/Preprocess.pm @@ -26,18 +26,25 @@ sub register { # internal sub _process_dbf { my ($job, $remote_user) = @_; + my $app = $job->app; - $job->app->log->info("Start preprocess $$: ".$job->id); - my $db_adup = $job->app->mysql_adup->db; + my $guard = $job->minion->guard('upload_job_guard', 3600); + unless ($guard) { + $app->log->error("Exited preprocess $$: ".$job->id.'. Other concurrent job is active.'); + return $job->finish('Other concurrent job is active'); + } + + $app->log->info("Start preprocess $$: ".$job->id); + my $db_adup = $app->mysql_adup->db; my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>0); - $job->app->set_task_state($db_adup, $TASK_ID, $job->id); + $app->set_task_state($db_adup, $TASK_ID, $job->id); - my $dbf = eval { new XBase($job->app->config('galdb_temporary_file')) }; + my $dbf = eval { new XBase($app->config('galdb_temporary_file')) }; unless (defined $dbf) { $log->l(state => 1, info => "Произошла ошибка обработки файла выгрузки"); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('XBase object creation failed'); } @@ -47,7 +54,7 @@ sub _process_dbf { $db_adup->query("DELETE FROM flatdepts"); }; unless (defined $e) { - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('Tables cleanup error'); } @@ -133,7 +140,7 @@ sub _process_dbf { }; unless (defined $e) { $log->l(state => 1, info => "Произошла ошибка записи таблицы persons, $loaded_cnt сотрудников обработано"); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('Mysql insert to table persons error'); } $loaded_cnt++; @@ -161,7 +168,7 @@ sub _process_dbf { }; unless (defined $e) { $log->l(state => 1, info => "Произошла ошибка обновления дубликатов в таблице persons, $loaded_cnt сотрудников обработано"); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('Mysql update dublicates in table persons error'); } } @@ -192,7 +199,7 @@ sub _process_dbf { }; unless (defined $e) { $log->l(state => 1, info => "Произошла ошибка записи таблицы подразделений"); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('Mysql insert to table depts error'); } $dept_loaded_cnt++; @@ -219,7 +226,7 @@ sub _process_dbf { }; unless (defined $e) { $log->l(state => 1, info => "Произошла ошибка записи подразделений в плоском формате"); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); return $job->fail('Mysql insert to table flatdepts error'); } $flatdept_loaded_cnt++; @@ -230,9 +237,9 @@ sub _process_dbf { $log->l(info => "Загружен шаблон ИС \"Галактика\" с информацией по $loaded_cnt сотрудникам и выполнен разбор оргструктуры по $dept_loaded_cnt/$flatdept_loaded_cnt подразделениям"); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); - $job->app->log->info("Finish $$: ".$job->id); + $app->log->info("Finish $$: ".$job->id); $job->finish; } diff --git a/lib/Adup/Task/SmbLoad.pm b/lib/Adup/Task/SmbLoad.pm index 3088ad0..f12d52a 100644 --- a/lib/Adup/Task/SmbLoad.pm +++ b/lib/Adup/Task/SmbLoad.pm @@ -15,6 +15,12 @@ sub _smbload { my $job = shift; my $app = $job->app; + my $guard = $job->minion->guard('upload_job_guard', 3600); + unless ($guard) { + $app->log->error("Exited smbload $$: ".$job->id.'. Other concurrent job is active.'); + return $job->finish('Other concurrent job is active'); + } + $app->log->info("Start smbload $$: ".$job->id); # download file via smbclient diff --git a/lib/Adup/Task/Sync.pm b/lib/Adup/Task/Sync.pm index 8911786..5363772 100644 --- a/lib/Adup/Task/Sync.pm +++ b/lib/Adup/Task/Sync.pm @@ -46,31 +46,39 @@ sub register { # internal sub _sync { my ($job, $remote_user) = @_; + my $app = $job->app; - $job->app->log->info("Start sync $$: ".$job->id); - my $db_adup = $job->app->mysql_adup->db; + my $guard = $job->minion->guard('upload_job_guard', 3600); + my $guard1 = $job->minion->guard('sync_job_guard', 3600); + unless ($guard && $guard1) { + $app->log->error("Exited sync $$: ".$job->id.'. Other concurrent job is active.'); + return $job->finish('Other concurrent job is active'); + } + + $app->log->info("Start sync $$: ".$job->id); + my $db_adup = $app->mysql_adup->db; my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>10); - my $ldap = Net::LDAP->new($job->app->config->{ldap_servers}, port => 389, timeout => 10, version => 3); + my $ldap = Net::LDAP->new($app->config->{ldap_servers}, port => 389, timeout => 10, version => 3); unless ($ldap) { $log->l(state=>11, info=>"Произошла ошибка подключения к глобальному каталогу"); return $job->fail("LDAP creation error $@"); } - my $mesg = $ldap->bind($job->app->config->{ldap_user}, password => $job->app->config->{ldap_pass}); + my $mesg = $ldap->bind($app->config->{ldap_user}, password => $app->config->{ldap_pass}); if ($mesg->code) { $log->l(state=>11, info=>"Произошла ошибка авторизации при подключении к глобальному каталогу"); return $job->fail("LDAP bind error ".$mesg->error); } - $job->app->set_task_state($db_adup, $TASK_ID, $job->id); + $app->set_task_state($db_adup, $TASK_ID, $job->id); my $e = eval { $db_adup->query("DELETE FROM changes"); }; unless (defined $e) { - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); $ldap->unbind; return $job->fail('Changes table cleanup error'); } @@ -87,7 +95,7 @@ sub _sync { pos => $idx ); unless (defined $c) { - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); $ldap->unbind; return $job->fail("$seq_el->{name} fatal error"); } @@ -98,9 +106,9 @@ sub _sync { $check ||= $SYNC_SEQUENCE[$_]->{_result} > 0 for @{$seq_el->{pre_stop}}; if ($check) { $log->l(info => $seq_el->{pre_stop_msg}); - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); $ldap->unbind; - $job->app->log->info("Pre-finish $$: ".$job->id); + $app->log->info("Pre-finish $$: ".$job->id); return $job->finish; } } @@ -109,10 +117,10 @@ sub _sync { } # done - $job->app->reset_task_state($db_adup, $TASK_ID); + $app->reset_task_state($db_adup, $TASK_ID); $ldap->unbind; - $job->app->log->info("Finish $$: ".$job->id); + $app->log->info("Finish $$: ".$job->id); $job->finish; } diff --git a/lib/Adup/Task/Zupprocess.pm b/lib/Adup/Task/Zupprocess.pm index 45922a5..ddd7483 100644 --- a/lib/Adup/Task/Zupprocess.pm +++ b/lib/Adup/Task/Zupprocess.pm @@ -25,6 +25,12 @@ sub _load_zup { my ($job, $remote_user) = @_; my $app = $job->app; + my $guard = $job->minion->guard('upload_job_guard', 3600); + unless ($guard) { + $app->log->error("Exited zupprocess $$: ".$job->id.'. Other concurrent job is active.'); + return $job->finish('Other concurrent job is active'); + } + $app->log->info("Start zupprocess $$: ".$job->id); my $db_adup = $app->mysql_adup->db; @@ -33,7 +39,7 @@ sub _load_zup { $app->set_task_state($db_adup, $TASK_ID, $job->id); my $loader = eval { - Adup::Ural::ZupLoader->new($job->app, $db_adup, + Adup::Ural::ZupLoader->new($app, $db_adup, sub { $job->note(progress => shift, info => shift) } ) }; @@ -43,11 +49,6 @@ sub _load_zup { return $job->finish('Failed: bad server url'); } - #$job->note( - # progress => 0, - # info => "0% Соединение с сервером..." - #); - my @load_results; eval { $loader->upload_data;