Skip to content

Commit

Permalink
Use minion jobs api, state table dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
uralm1 committed Jun 18, 2021
1 parent 3824fb6 commit df1f86e
Show file tree
Hide file tree
Showing 16 changed files with 45 additions and 102 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
All notable changes to this project will be documented in this file.

## [Unreleased]

## [1.19] - 2021-06-18
- Взаимоблокировка заданий через named guard locks.
- Рефакторинг управления заданиями.
- Рефакторинг управления заданиями. Больше не используется таблица states.
- Исправлен баг с тем, что запуск заданий не всегда сразу отображается в gui, прогрессбар отображается после перезагрузки страницы.

## [1.18] - 2021-06-17
Expand Down
1 change: 0 additions & 1 deletion MANIFEST
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ lib/Adup.pm
lib/Adup/Command/cron.pm
lib/Adup/Command/merge.pm
lib/Adup/Command/preprocess.pm
lib/Adup/Command/resettasks.pm
lib/Adup/Command/smbload.pm
lib/Adup/Command/sync.pm
lib/Adup/Command/zupload.pm
Expand Down
1 change: 0 additions & 1 deletion lib/Adup.pm
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use Mojo::Base 'Mojolicious';
use Adup::Command::preprocess;
use Adup::Command::sync;
use Adup::Command::merge;
use Adup::Command::resettasks;
use Adup::Command::smbload;
use Adup::Command::zupload;
use Adup::Command::cron;
Expand Down
21 changes: 0 additions & 21 deletions lib/Adup/Command/resettasks.pm

This file was deleted.

6 changes: 3 additions & 3 deletions lib/Adup/Controller/Sync.pm
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ sub index {
return unless $self->exists_and_number($log_active_page);

# check sync task is running
my $sync_task_id = $self->check_task_in_progress('sync_id', 'stid');
my $sync_task_id = $self->check_task_in_progress('sync', 'stid');
# check merge task is running
my $merge_task_id = $self->check_task_in_progress('merge_id', 'mtid');
my $merge_task_id = $self->check_task_in_progress('merge', 'mtid');

my $db = $self->mysql_adup->db;
# get last upload
Expand Down Expand Up @@ -97,7 +97,7 @@ sub check {
return undef unless $self->authorize({admin=>1});

# check sync task progress
my $task_id = $self->db_task_id('sync_id');
my $task_id = $self->task_id('sync');
my $progress = 0;
my $info = '';
if ($task_id == 0) {
Expand Down
2 changes: 1 addition & 1 deletion lib/Adup/Controller/Syncmerge.pm
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sub check {
return undef unless $self->authorize({admin=>1});

# check sync task progress
my $task_id = $self->db_task_id('merge_id');
my $task_id = $self->task_id('merge');
my $progress = 0;
my $info = '';
if ($task_id == 0) {
Expand Down
4 changes: 2 additions & 2 deletions lib/Adup/Controller/Upload.pm
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ sub index {
return unless $self->exists_and_number($log_active_page);

# check dbf processing in progress
my $upload_task_id = $self->check_task_in_progress('preprocess_id', 'utid');
my $upload_task_id = $self->check_task_in_progress('preprocess', 'utid');

# paginated log
my $lines_on_page = $self->config('log_lines_on_page');
Expand Down Expand Up @@ -113,7 +113,7 @@ sub check {
return undef unless $self->authorize({admin=>1, gala=>1});

# check dbf processing in progress
my $task_id = $self->db_task_id('preprocess_id');
my $task_id = $self->task_id('preprocess');
my $progress = 0;
my $info = '';
if ($task_id == 0) {
Expand Down
4 changes: 2 additions & 2 deletions lib/Adup/Controller/Zupload.pm
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ sub index {
return unless $self->exists_and_number($log_active_page);

# check zup loading in progress
my $zupprocess_task_id = $self->check_task_in_progress('zupprocess_id', 'ztid');
my $zupprocess_task_id = $self->check_task_in_progress('zupprocess', 'ztid');

# paginated log
my $lines_on_page = $self->config('log_lines_on_page');
Expand Down Expand Up @@ -74,7 +74,7 @@ sub check {
return undef unless $self->authorize({admin=>1, zup1c=>1});

# check zup loading in progress
my $task_id = $self->db_task_id('zupprocess_id');
my $task_id = $self->task_id('zupprocess');
my $progress = 0;
my $info = '';
if ($task_id == 0) {
Expand Down
16 changes: 16 additions & 0 deletions lib/Adup/Plugin/Migrations.pm
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,19 @@ ALTER TABLE `persons` CHANGE `gal_id` `gal_id` VARCHAR(36) NOT NULL;
-- 5 down
ALTER TABLE `persons` CHANGE `gal_id` `gal_id` VARCHAR(20) NOT NULL;
-- 6 up
DROP TABLE IF EXISTS `state`;
-- 6 down
CREATE TABLE IF NOT EXISTS `state` (
`key` varchar(20) NOT NULL,
`value` int(11) NOT NULL,
PRIMARY KEY (`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `state` (`key`, `value`) VALUES
('merge_id', 0),
('preprocess_id', 0),
('zupprocess_id', 0),
('sync_id', 0);
54 changes: 16 additions & 38 deletions lib/Adup/Plugin/Utils.pm
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ sub register {
});


# my $task_id = $self->db_task_id('preprocess_id')
$app->helper(db_task_id => sub {
my ($self, $k) = @_;
my $task_id = 0;
eval {
my $r = $self->mysql_adup->db->query("SELECT value FROM state WHERE `key` = ?", $k);
$task_id = $r->array->[0];
$r->finish;
};
return $task_id;
# my $job_id = $self->task_id('preprocess')
$app->helper(task_id => sub {
my ($self, $task) = @_;
my $j = $self->minion->jobs(
{ tasks => [$task],
states => ['inactive', 'active']
}
);
return ($j->total > 0) ? $j->next->{id} : 0;
});


# 1/undef = $self->can_start_task()
# 1/undef = $self->can_start_task(sub { say $_ })
$app->helper(can_start_task => sub {
Expand All @@ -89,7 +89,8 @@ sub register {
my $j = $self->minion->jobs(
{ tasks => ['preprocess', 'zupprocess', 'sync', 'merge'],
states => ['inactive', 'active']
});
}
);
if ($j->total > 0) {
$_ = $j->next->{task};
$reporting_cb->($_) if $reporting_cb;
Expand All @@ -100,50 +101,27 @@ sub register {
});


# my $task_id = $self->check_task_in_progress('preprocess_id', 'utid')
# my $task_id = $self->check_task_in_progress('preprocess', 'utid')
$app->helper(check_task_in_progress => sub {
my ($self, $k, $sesk) = @_;
my $task_id = $self->db_task_id($k);
my ($self, $task, $sesk) = @_;
my $task_id = $self->task_id($task);
my $s_id = $self->session($sesk);
if ($task_id == 0) {
unless (defined $s_id) {
$task_id = undef; # start
} else {
$task_id = $s_id if $s_id > 0; # wait after task start
# finished
}
} else {
# task_id > 0, we should always wait
if (defined $s_id and $s_id == 0) {
$self->session($sesk => $task_id); #some erratic behavior
$self->session($sesk => $task_id); # some erratic behavior
}
}
return $task_id;
});


# $job->app->set_task_state($db, 'preprocess_id', 0)
$app->helper(set_task_state => sub {
my ($self, $db, $key, $s) = @_;

my $e = eval {
$db->query("UPDATE state SET value = ? WHERE `key` = ?", $s, $key);
};
unless (defined $e) {
# we have to fix croak here
carp "Set task state failed";
return undef;
}
1;
});

# $job->app->reset_task_state($db, 'preprocess_id')
$app->helper(reset_task_state => sub {
my ($self, $db, $key) = @_;
$self->set_task_state($db, $key, 0);
});


# html_or_undef = check_newversion
$app->helper(check_newversion => sub {
my $c = shift;
Expand Down
5 changes: 0 additions & 5 deletions lib/Adup/Task/Merge.pm
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use Adup::Ural::ChangeFlatGroupModify;
use Adup::Ural::ChangeError;
use Adup::Ural::Dblog;

my $TASK_ID = 'merge_id';
# $TASK_LOG_STATE_SUCCESS = 90;
# $TASK_LOG_STATE_ERROR = 91;

Expand Down Expand Up @@ -63,8 +62,6 @@ sub _merge {
return $job->fail("LDAP bind error ".$mesg->error);
}

$app->set_task_state($db_adup, $TASK_ID, $job->id);

# merging changes for types ... in sequence
# see type_robotic field in change objects
my @merge_sequence = (
Expand Down Expand Up @@ -107,7 +104,6 @@ sub _merge {
ORDER BY $m_order_tmpl id ASC", $mt);
};
unless (defined $e) {
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Merge - database fatal error');
}
my $changes_count = 0;
Expand Down Expand Up @@ -153,7 +149,6 @@ sub _merge {

$log->l(info => 'Отчёт о применении изменений. '.$log_buf) if $log_buf;

$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;

$app->log->info("Finish $$: ".$job->id);
Expand Down
11 changes: 0 additions & 11 deletions lib/Adup/Task/Preprocess.pm
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use Digest::SHA qw(sha1_hex);
use Adup::Ural::Dblog;
use Adup::Ural::FlatGroupNamingAI qw(flatgroup_ai);

my $TASK_ID = 'preprocess_id';
# $TASK_LOG_STATE_SUCCESS = 0;
# $TASK_LOG_STATE_ERROR = 1;

Expand All @@ -39,12 +38,9 @@ sub _process_dbf {

my $log = Adup::Ural::Dblog->new($db_adup, login=>$remote_user, state=>0);

$app->set_task_state($db_adup, $TASK_ID, $job->id);

my $dbf = eval { new XBase($app->config('galdb_temporary_file')) };
unless (defined $dbf) {
$log->l(state => 1, info => "Произошла ошибка обработки файла выгрузки");
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('XBase object creation failed');
}

Expand All @@ -54,7 +50,6 @@ sub _process_dbf {
$db_adup->query("DELETE FROM flatdepts");
};
unless (defined $e) {
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Tables cleanup error');
}

Expand Down Expand Up @@ -140,7 +135,6 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка записи таблицы persons, $loaded_cnt сотрудников обработано");
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql insert to table persons error');
}
$loaded_cnt++;
Expand Down Expand Up @@ -168,7 +162,6 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка обновления дубликатов в таблице persons, $loaded_cnt сотрудников обработано");
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql update dublicates in table persons error');
}
}
Expand Down Expand Up @@ -199,7 +192,6 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка записи таблицы подразделений");
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql insert to table depts error');
}
$dept_loaded_cnt++;
Expand All @@ -226,7 +218,6 @@ sub _process_dbf {
};
unless (defined $e) {
$log->l(state => 1, info => "Произошла ошибка записи подразделений в плоском формате");
$app->reset_task_state($db_adup, $TASK_ID);
return $job->fail('Mysql insert to table flatdepts error');
}
$flatdept_loaded_cnt++;
Expand All @@ -237,8 +228,6 @@ sub _process_dbf {

$log->l(info => "Загружен шаблон ИС \"Галактика\" с информацией по $loaded_cnt сотрудникам и выполнен разбор оргструктуры по $dept_loaded_cnt/$flatdept_loaded_cnt подразделениям");

$app->reset_task_state($db_adup, $TASK_ID);

$app->log->info("Finish $$: ".$job->id);
$job->finish;
}
Expand Down
7 changes: 0 additions & 7 deletions lib/Adup/Task/Sync.pm
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use Adup::Ural::SyncDeleteFlatGroups;
use Adup::Ural::SyncDeleteOUs;
use Adup::Ural::SyncDisableDismissed;

my $TASK_ID = 'sync_id';
# $TASK_LOG_STATE_SUCCESS = 10;
# $TASK_LOG_STATE_ERROR = 11;

Expand Down Expand Up @@ -72,13 +71,10 @@ sub _sync {
return $job->fail("LDAP bind error ".$mesg->error);
}

$app->set_task_state($db_adup, $TASK_ID, $job->id);

my $e = eval {
$db_adup->query("DELETE FROM changes");
};
unless (defined $e) {
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;
return $job->fail('Changes table cleanup error');
}
Expand All @@ -95,7 +91,6 @@ sub _sync {
pos => $idx
);
unless (defined $c) {
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;
return $job->fail("$seq_el->{name} fatal error");
}
Expand All @@ -106,7 +101,6 @@ sub _sync {
$check ||= $SYNC_SEQUENCE[$_]->{_result} > 0 for @{$seq_el->{pre_stop}};
if ($check) {
$log->l(info => $seq_el->{pre_stop_msg});
$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;
$app->log->info("Pre-finish $$: ".$job->id);
return $job->finish;
Expand All @@ -117,7 +111,6 @@ sub _sync {
}
# done

$app->reset_task_state($db_adup, $TASK_ID);
$ldap->unbind;

$app->log->info("Finish $$: ".$job->id);
Expand Down
Loading

0 comments on commit df1f86e

Please sign in to comment.