diff --git a/datashare-db/src/main/java/org/icij/datashare/db/JooqTaskRepository.java b/datashare-db/src/main/java/org/icij/datashare/db/JooqTaskRepository.java index 87eee9379..1d7106a35 100644 --- a/datashare-db/src/main/java/org/icij/datashare/db/JooqTaskRepository.java +++ b/datashare-db/src/main/java/org/icij/datashare/db/JooqTaskRepository.java @@ -1,15 +1,27 @@ package org.icij.datashare.db; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.icij.datashare.asynctasks.Group; import org.icij.datashare.asynctasks.Task; import org.icij.datashare.asynctasks.TaskRepository; +import org.icij.datashare.db.tables.records.TaskRecord; import org.jooq.SQLDialect; +import org.jooq.impl.DSL; import javax.sql.DataSource; +import java.sql.Timestamp; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import static java.util.Optional.ofNullable; +import static org.icij.datashare.asynctasks.bus.amqp.Event.MAX_RETRIES_LEFT; +import static org.icij.datashare.db.Tables.TASK; + public class JooqTaskRepository implements TaskRepository { private final DataSource connectionProvider; private final SQLDialect dialect; @@ -41,12 +53,31 @@ public boolean containsValue(Object o) { @Override public Task get(Object o) { - return null; + return createTaskFrom(DSL.using(connectionProvider, dialect).selectFrom(TASK). + where(TASK.ID.eq((String) o)).fetchOne()); } @Override public Task put(String s, Task task) { - return null; + try { + if (task == null || s == null || !s.equals(task.getId())) { + throw new IllegalArgumentException(String.format("task is null or its id (%s) is different than the key (%s)", ofNullable(task).map(Task::getId).orElse(null), s)); + } + final int inserted = DSL.using(connectionProvider, dialect) + .insertInto(TASK).columns( + TASK.ID, TASK.NAME, TASK.STATE, TASK.USER_ID, TASK.GROUP_ID, TASK.PROGRESS, + TASK.CREATED_AT, TASK.RETRIES_LEFT, TASK.MAX_RETRIES, TASK.ARGS) + .values(task.id, task.name, + task.getState().name(), + ofNullable(task.getUser()).map(u -> u.id).orElse(null), + ofNullable(task.getGroup()).map(Group::id).orElse(null), + task.getProgress(), + new Timestamp(task.createdAt.getTime()).toLocalDateTime(), task.getRetriesLeft(), + MAX_RETRIES_LEFT, new ObjectMapper().writeValueAsString(task.args)).execute(); + return inserted == 1 ? task : null; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } } @Override @@ -78,4 +109,16 @@ public Collection> values() { public Set>> entrySet() { return Set.of(); } + + private Task createTaskFrom(TaskRecord taskRecord) { + return ofNullable(taskRecord).map(r -> + { + try { + return new Task<>(r.getId(), r.getName(), Task.State.valueOf(r.getState()), + r.getProgress(), null, new ObjectMapper().readValue(r.getArgs(), new TypeReference>(){})); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }).orElse(null); + } } diff --git a/datashare-db/src/main/resources/liquibase/changelog/changes/036-create_task.yml b/datashare-db/src/main/resources/liquibase/changelog/changes/036-create_task.yml new file mode 100644 index 000000000..f9e02dedb --- /dev/null +++ b/datashare-db/src/main/resources/liquibase/changelog/changes/036-create_task.yml @@ -0,0 +1,86 @@ +databaseChangeLog: + - changeSet: + id: 36 + author: bthomas + changes: + - createTable: + tableName: task + columns: + - column: + name: id + type: varchar(96) + constraints: + primaryKey: true + - column: + name: name + type: varchar(128) + constraints: + nullable: false + - column: + name: state + type: varchar(16) + constraints: + nullable: false + - column: + name: user_id + type: varchar(96) + - column: + name: group_id + type: varchar(128) + - column: + name: progress + type: float + defaultValue: 0 + - column: + name: created_at + type: datetime + constraints: + nullable: false + - column: + name: completed_at + type: datetime + - column: + name: retries_left + type: int + - column: + name: max_retries + type: int + - column: + name: args + type: text + + - createIndex: + indexName: task_name + tableName: task + columns: + - column: + name: name + type: varchar(128) + - createIndex: + indexName: task_created_at + tableName: task + columns: + - column: + name: created_at + type: datetime + - createIndex: + indexName: task_state + tableName: task + columns: + - column: + name: state + type: varchar(16) + - createIndex: + indexName: task_user_id + tableName: task + columns: + - column: + name: user_id + type: varchar(96) + - createIndex: + indexName: task_group + tableName: task + columns: + - column: + name: group_id + type: varchar(128) \ No newline at end of file diff --git a/datashare-db/src/main/resources/liquibase/changelog/db.changelog.yml b/datashare-db/src/main/resources/liquibase/changelog/db.changelog.yml index ff2e016f3..a7420b43c 100644 --- a/datashare-db/src/main/resources/liquibase/changelog/db.changelog.yml +++ b/datashare-db/src/main/resources/liquibase/changelog/db.changelog.yml @@ -106,4 +106,7 @@ databaseChangeLog: relativeToChangelogFile: true - include: file: changes/035-batch-search-rename-tag-column-to-query-template.yml + relativeToChangelogFile: true + - include: + file: changes/036-create_task.yml relativeToChangelogFile: true \ No newline at end of file diff --git a/datashare-db/src/test/java/org/icij/datashare/db/JooqTaskRepositoryTest.java b/datashare-db/src/test/java/org/icij/datashare/db/JooqTaskRepositoryTest.java index 8a6e61e3a..6c25bb5b3 100644 --- a/datashare-db/src/test/java/org/icij/datashare/db/JooqTaskRepositoryTest.java +++ b/datashare-db/src/test/java/org/icij/datashare/db/JooqTaskRepositoryTest.java @@ -3,7 +3,6 @@ import org.icij.datashare.asynctasks.Task; import org.icij.datashare.test.DatashareTimeRule; import org.icij.datashare.user.User; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -23,13 +22,29 @@ public class JooqTaskRepositoryTest { public DbSetupRule dbRule; private final JooqTaskRepository repository; - @Ignore + @Test(expected = IllegalArgumentException.class) + public void test_put_with_key_different_than_id() { + assertThat(repository.put("my_key", new Task<>("foo", User.local(), Map.of()))); + } + + @Test(expected = IllegalArgumentException.class) + public void test_put_with_id_null() { + assertThat(repository.put("my_key", new Task<>(null, "foo", User.local(), Map.of()))); + } + + @Test(expected = IllegalArgumentException.class) + public void test_put_with_null_key() { + assertThat(repository.put(null, new Task<>("foo", User.local(), Map.of()))); + } + @Test - public void test_get_put() { + public void test_put_get() { Task foo = new Task<>("foo", User.local(), Map.of()); - repository.put("foo", foo); - assertThat(repository.get("foo")).isNotSameAs(foo); // not same instance - assertThat(repository.get("foo")).isEqualTo(foo); // but equals as defined by Task + + assertThat(repository.put(foo.getId(), foo)).isSameAs(foo); + + assertThat(repository.get(foo.getId())).isNotSameAs(foo); // not same instance + assertThat(repository.get(foo.getId())).isEqualTo(foo); // but equals as defined by Task } @Parameterized.Parameters diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java index 1f0d2ec5a..733444c48 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/Task.java @@ -14,7 +14,6 @@ import org.icij.datashare.batch.WebQueryPagination; import org.icij.datashare.user.User; -import java.io.Serial; import java.io.Serializable; import java.util.Collections; import java.util.HashMap; @@ -29,7 +28,6 @@ import static java.util.Optional.ofNullable; import static java.util.UUID.randomUUID; import static org.icij.datashare.batch.WebQueryPagination.OrderDirection.ASC; -import static org.icij.datashare.batch.WebQueryPagination.OrderDirection.DESC; @JsonInclude(JsonInclude.Include.NON_NULL) public class Task extends Event implements Entity, Comparable> { @@ -75,7 +73,7 @@ public Task(String id, String name, User user, Map args) { } @JsonCreator - Task(@JsonProperty("id") String id, + public Task(@JsonProperty("id") String id, @JsonProperty("name") String name, @JsonProperty("state") State state, @JsonProperty("progress") double progress, @@ -165,6 +163,8 @@ public State getState() { return state; } + public int getRetriesLeft() {return retriesLeft;} + @JsonIgnore public boolean isFinished() { return State.DONE.equals(state) || State.CANCELLED.equals(state) || State.ERROR.equals(state); diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskRepository.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskRepository.java index b0fc8b9d1..3784897f7 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskRepository.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/TaskRepository.java @@ -3,4 +3,11 @@ import java.util.Map; public interface TaskRepository extends Map> { + default Task save(Task task) { + return put(task.getId(), task); + } + + default Task get(String id) { + return get((Object)id); + } } diff --git a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/Event.java b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/Event.java index 1aacc76ed..b654124a8 100644 --- a/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/Event.java +++ b/datashare-tasks/src/main/java/org/icij/datashare/asynctasks/bus/amqp/Event.java @@ -24,7 +24,8 @@ @JsonSubTypes.Type(ErrorEvent.class)}) public class Event implements Serializable { @Serial private static final long serialVersionUID = -2295266944323500399L; - protected int retriesLeft = 3; + public static final int MAX_RETRIES_LEFT = 3; + protected int retriesLeft = MAX_RETRIES_LEFT; public final Date createdAt; public Event() {