Skip to content

Commit

Permalink
Use named locks to block concurrent jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
uralm1 committed Jun 17, 2021
1 parent b8522a4 commit 2754a0e
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
All notable changes to this project will be documented in this file.

## [Unreleased]
- Взаимоблокировка заданий через named guard locks.
- Исправлен баг с тем, что запуск заданий не всегда сразу отображается в gui, прогрессбар отображается после перезагрузки страницы.

## [1.18] - 2021-06-17
- Синхронизация с 1С ЗУП.
Expand Down
6 changes: 5 additions & 1 deletion lib/Adup.pm
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use Adup::Command::sync;
use Adup::Command::merge;
use Adup::Command::resettasks;
use Adup::Command::smbload;
use Adup::Command::zupload;
use Adup::Command::cron;

our $VERSION = '1.18';
our $VERSION = '1.19';

# This method will run once at server start
sub startup {
Expand Down Expand Up @@ -52,6 +53,9 @@ sub startup {
# update database
$self->migrate_database;

# Reset locks
$self->minion->on(worker => sub { shift->reset({locks=>1}) });

# Router authentication routine
$self->hook(before_dispatch => sub {
my $c = shift;
Expand Down
25 changes: 16 additions & 9 deletions lib/Adup/Task/Merge.pm
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,32 @@ sub register {
# internal
sub _merge {
my ($job, $remote_user) = @_;
my $app = $job->app;

$job->app->log->info("Start merge $$: ".$job->id);
my $db_adup = $job->app->mysql_adup->db;
my $guard = $job->minion->guard('sync_job_guard', 3600);
unless ($guard) {
$app->log->error("Exited merge $$: ".$job->id.'. Other concurrent job is active.');
return $job->finish('Other concurrent job is active');
}

$app->log->info("Start merge $$: ".$job->id);
my $db_adup = $app->mysql_adup->db;

my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>90);

my $ldap = Net::LDAP->new($job->app->config->{ldap_servers}, port => 389, timeout => 10, version => 3);
my $ldap = Net::LDAP->new($app->config->{ldap_servers}, port => 389, timeout => 10, version => 3);
unless ($ldap) {
$log->l(state=>91, info=>"Произошла ошибка подключения к глобальному каталогу");
return $job->fail("LDAP creation error $@");
}

my $mesg = $ldap->bind($job->app->config->{ldap_user}, password => $job->app->config->{ldap_pass});
my $mesg = $ldap->bind($app->config->{ldap_user}, password => $app->config->{ldap_pass});
if ($mesg->code) {
$log->l(state=>91, info=>"Произошла ошибка авторизации при подключении к глобальному каталогу");
return $job->fail("LDAP bind error ".$mesg->error);
}

$job->app->set_task_state($db_adup, $TASK_ID, $job->id);
$app->set_task_state($db_adup, $TASK_ID, $job->id);

# merging changes for types ... in sequence
# see type_robotic field in change objects
Expand Down Expand Up @@ -100,7 +107,7 @@ sub _merge {
ORDER BY $m_order_tmpl id ASC", $mt);
};
unless (defined $e) {
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Merge - database fatal error');
}
my $changes_count = 0;
Expand All @@ -111,7 +118,7 @@ sub _merge {
while (my $next = $res->hash) {
my $c = Adup::Ural::ChangeFactory->fromdb(id=>$next->{id}, json=>$next->{c});
if ($c->approved) {
if ($c->merge(author=>$remote_user, db=>$db_adup, ldap=>$ldap, config=>$job->app->config, log=>$log)) {
if ($c->merge(author=>$remote_user, db=>$db_adup, ldap=>$ldap, config=>$app->config, log=>$log)) {
$changes_processed_count++;
} else {
$log->l(state=>91, info=>"Изменение-$seq_el->{desc} не применено. Возникла ошибка при применении изменения-$seq_el->{desc}: $next->{name}.");
Expand Down Expand Up @@ -146,10 +153,10 @@ sub _merge {

$log->l(info => 'Отчёт о применении изменений. '.$log_buf) if $log_buf;

$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;

$job->app->log->info("Finish $$: ".$job->id);
$app->log->info("Finish $$: ".$job->id);
$job->finish;
}

Expand Down
31 changes: 19 additions & 12 deletions lib/Adup/Task/Preprocess.pm
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,25 @@ sub register {
# internal
sub _process_dbf {
my ($job, $remote_user) = @_;
my $app = $job->app;

$job->app->log->info("Start preprocess $$: ".$job->id);
my $db_adup = $job->app->mysql_adup->db;
my $guard = $job->minion->guard('upload_job_guard', 3600);
unless ($guard) {
$app->log->error("Exited preprocess $$: ".$job->id.'. Other concurrent job is active.');
return $job->finish('Other concurrent job is active');
}

$app->log->info("Start preprocess $$: ".$job->id);
my $db_adup = $app->mysql_adup->db;

my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>0);

$job->app->set_task_state($db_adup, $TASK_ID, $job->id);
$app->set_task_state($db_adup, $TASK_ID, $job->id);

my $dbf = eval { new XBase($job->app->config('galdb_temporary_file')) };
my $dbf = eval { new XBase($app->config('galdb_temporary_file')) };
unless (defined $dbf) {
$log->l(state => 1, info => "Произошла ошибка обработки файла выгрузки");
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('XBase object creation failed');
}

Expand All @@ -47,7 +54,7 @@ sub _process_dbf {
$db_adup->query("DELETE FROM flatdepts");
};
unless (defined $e) {
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Tables cleanup error');
}

Expand Down Expand Up @@ -133,7 +140,7 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка записи таблицы persons, $loaded_cnt сотрудников обработано");
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql insert to table persons error');
}
$loaded_cnt++;
Expand Down Expand Up @@ -161,7 +168,7 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка обновления дубликатов в таблице persons, $loaded_cnt сотрудников обработано");
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql update dublicates in table persons error');
}
}
Expand Down Expand Up @@ -192,7 +199,7 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка записи таблицы подразделений");
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql insert to table depts error');
}
$dept_loaded_cnt++;
Expand All @@ -219,7 +226,7 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка записи подразделений в плоском формате");
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql insert to table flatdepts error');
}
$flatdept_loaded_cnt++;
Expand All @@ -230,9 +237,9 @@ sub _process_dbf {

$log->l(info => "Загружен шаблон ИС \"Галактика\" с информацией по $loaded_cnt сотрудникам и выполнен разбор оргструктуры по $dept_loaded_cnt/$flatdept_loaded_cnt подразделениям");

$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);

$job->app->log->info("Finish $$: ".$job->id);
$app->log->info("Finish $$: ".$job->id);
$job->finish;
}

Expand Down
6 changes: 6 additions & 0 deletions lib/Adup/Task/SmbLoad.pm
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ sub _smbload {
my $job = shift;
my $app = $job->app;

my $guard = $job->minion->guard('upload_job_guard', 3600);
unless ($guard) {
$app->log->error("Exited smbload $$: ".$job->id.'. Other concurrent job is active.');
return $job->finish('Other concurrent job is active');
}

$app->log->info("Start smbload $$: ".$job->id);

# download file via smbclient
Expand Down
30 changes: 19 additions & 11 deletions lib/Adup/Task/Sync.pm
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,39 @@ sub register {
# internal
sub _sync {
my ($job, $remote_user) = @_;
my $app = $job->app;

$job->app->log->info("Start sync $$: ".$job->id);
my $db_adup = $job->app->mysql_adup->db;
my $guard = $job->minion->guard('upload_job_guard', 3600);
my $guard1 = $job->minion->guard('sync_job_guard', 3600);
unless ($guard && $guard1) {
$app->log->error("Exited sync $$: ".$job->id.'. Other concurrent job is active.');
return $job->finish('Other concurrent job is active');
}

$app->log->info("Start sync $$: ".$job->id);
my $db_adup = $app->mysql_adup->db;

my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>10);

my $ldap = Net::LDAP->new($job->app->config->{ldap_servers}, port => 389, timeout => 10, version => 3);
my $ldap = Net::LDAP->new($app->config->{ldap_servers}, port => 389, timeout => 10, version => 3);
unless ($ldap) {
$log->l(state=>11, info=>"Произошла ошибка подключения к глобальному каталогу");
return $job->fail("LDAP creation error $@");
}

my $mesg = $ldap->bind($job->app->config->{ldap_user}, password => $job->app->config->{ldap_pass});
my $mesg = $ldap->bind($app->config->{ldap_user}, password => $app->config->{ldap_pass});
if ($mesg->code) {
$log->l(state=>11, info=>"Произошла ошибка авторизации при подключении к глобальному каталогу");
return $job->fail("LDAP bind error ".$mesg->error);
}

$job->app->set_task_state($db_adup, $TASK_ID, $job->id);
$app->set_task_state($db_adup, $TASK_ID, $job->id);

my $e = eval {
$db_adup->query("DELETE FROM changes");
};
unless (defined $e) {
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;
return $job->fail('Changes table cleanup error');
}
Expand All @@ -87,7 +95,7 @@ sub _sync {
pos => $idx
);
unless (defined $c) {
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;
return $job->fail("$seq_el->{name} fatal error");
}
Expand All @@ -98,9 +106,9 @@ sub _sync {
$check ||= $SYNC_SEQUENCE[$_]->{_result} > 0 for @{$seq_el->{pre_stop}};
if ($check) {
$log->l(info => $seq_el->{pre_stop_msg});
$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;
$job->app->log->info("Pre-finish $$: ".$job->id);
$app->log->info("Pre-finish $$: ".$job->id);
return $job->finish;
}
}
Expand All @@ -109,10 +117,10 @@ sub _sync {
}
# done

$job->app->reset_task_state($db_adup, $TASK_ID);
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;

$job->app->log->info("Finish $$: ".$job->id);
$app->log->info("Finish $$: ".$job->id);
$job->finish;
}

Expand Down
13 changes: 7 additions & 6 deletions lib/Adup/Task/Zupprocess.pm
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ sub _load_zup {
my ($job, $remote_user) = @_;
my $app = $job->app;

my $guard = $job->minion->guard('upload_job_guard', 3600);
unless ($guard) {
$app->log->error("Exited zupprocess $$: ".$job->id.'. Other concurrent job is active.');
return $job->finish('Other concurrent job is active');
}

$app->log->info("Start zupprocess $$: ".$job->id);
my $db_adup = $app->mysql_adup->db;

Expand All @@ -33,7 +39,7 @@ sub _load_zup {
$app->set_task_state($db_adup, $TASK_ID, $job->id);

my $loader = eval {
Adup::Ural::ZupLoader->new($job->app, $db_adup,
Adup::Ural::ZupLoader->new($app, $db_adup,
sub { $job->note(progress => shift, info => shift) }
)
};
Expand All @@ -43,11 +49,6 @@ sub _load_zup {
return $job->finish('Failed: bad server url');
}

#$job->note(
# progress => 0,
# info => "0% Соединение с сервером..."
#);

my @load_results;
eval {
$loader->upload_data;
Expand Down

0 comments on commit 2754a0e

Please sign in to comment.