Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improve unresolved task deletion job #594

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* text eol=lf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static java.util.stream.Collectors.toList;

import com.github.kagkarlsson.scheduler.SchedulerState.SettableSchedulerState;
import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.event.ExecutionInterceptor;
import com.github.kagkarlsson.scheduler.event.SchedulerListener;
import com.github.kagkarlsson.scheduler.event.SchedulerListener.SchedulerEventType;
Expand All @@ -34,7 +35,9 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -354,8 +357,31 @@ public List<CurrentlyExecuting> getCurrentlyExecutingWithStaleHeartbeat() {
@SuppressWarnings({"rawtypes", "unchecked"})
protected void detectDeadExecutions() {
LOG.debug("Deleting executions with unresolved tasks.");
taskResolver
.getUnresolvedTaskNames(deleteUnresolvedAfter)

Map<String, Instant> taskToNewestExecutionTime = new HashMap<>();
schedulerTaskRepository.getScheduledExecutions(
ScheduledExecutionsFilter.all(),
execution ->
taskToNewestExecutionTime.merge(
execution.taskInstance.getTaskName(),
execution.executionTime,
(oldValue, newValue) -> oldValue.isAfter(newValue) ? oldValue : newValue));

taskResolver.getUnresolved().stream()
.map(UnresolvedTask::getTaskName)
.filter(
taskName -> {
Instant newestExecution = taskToNewestExecutionTime.get(taskName);

if (newestExecution == null) {
// probably deleted by other node
return true;
}

Duration age = Duration.between(newestExecution, clock.now());

return age.compareTo(deleteUnresolvedAfter) >= 0;
})
.forEach(
taskName -> {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Task;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,16 +78,6 @@ public List<UnresolvedTask> getUnresolved() {
return new ArrayList<>(unresolvedTasks.values());
}

public List<String> getUnresolvedTaskNames(Duration unresolvedFor) {
return unresolvedTasks.values().stream()
.filter(
unresolved ->
Duration.between(unresolved.firstUnresolved, clock.now()).toMillis()
> unresolvedFor.toMillis())
.map(UnresolvedTask::getTaskName)
.collect(Collectors.toList());
}

public void clearUnresolved(String taskName) {
unresolvedTasks.remove(taskName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.github.kagkarlsson.scheduler.stats.StatsRegistryAdapter;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -70,6 +71,11 @@ public ManualSchedulerBuilder pollingStrategy(PollingStrategyConfig pollingStrat
return this;
}

public ManualSchedulerBuilder deleteUnresolvedAfter(Duration deleteUnresolvedAfter) {
super.deleteUnresolvedAfter = deleteUnresolvedAfter;
return this;
}

public ManualScheduler build() {
final TaskResolver taskResolver = new TaskResolver(statsRegistry, clock, knownTasks);
final JdbcTaskRepository schedulerTaskRepository =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -41,28 +42,36 @@ public void setUp() {
@Test
public void should_delete_executions_with_old_unresolved_tasknames() {

OneTimeTask<Void> onetime = Tasks.oneTime("onetime").execute(TestTasks.DO_NOTHING);
OneTimeTask<Void> first = Tasks.oneTime("onetime_first").execute(TestTasks.DO_NOTHING);
OneTimeTask<Void> second = Tasks.oneTime("onetime_second").execute(TestTasks.DO_NOTHING);

TestableRegistry testableRegistry = new TestableRegistry(false, Collections.emptyList());
// Missing task with name 'onetime'
// Missing tasks with name 'onetime_first' and 'onetime_second'
ManualScheduler scheduler =
TestHelper.createManualScheduler(postgres.getDataSource())
.clock(clock)
.statsRegistry(testableRegistry)
.deleteUnresolvedAfter(Duration.ofDays(5))
.build();

scheduler.schedule(onetime.instance("id1"), clock.now());
scheduler.schedule(first.instance("id_f"), clock.now());
scheduler.schedule(second.instance("id_s"), clock.now().minus(1, ChronoUnit.DAYS));
scheduler.schedule(second.instance("id_s_2"), clock.now().minus(2, ChronoUnit.DAYS));
assertEquals(0, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK));

scheduler.runAnyDueExecutions();
assertEquals(1, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK));
assertEquals(3, testableRegistry.getCount(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK));

assertEquals(1, DbUtils.countExecutions(postgres.getDataSource()));
assertEquals(3, DbUtils.countExecutions(postgres.getDataSource()));

scheduler.runDeadExecutionDetection();
assertEquals(3, DbUtils.countExecutions(postgres.getDataSource()));

clock.tick(Duration.ofDays(4));
scheduler.runDeadExecutionDetection();
assertEquals(1, DbUtils.countExecutions(postgres.getDataSource()));

clock.set(clock.now().plus(Duration.ofDays(30)));
clock.tick(Duration.ofDays(1));
scheduler.runDeadExecutionDetection();
assertEquals(0, DbUtils.countExecutions(postgres.getDataSource()));

Expand Down
Loading