Skip to content

Commit

Permalink
feat: get/put functions #1626
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Jan 15, 2025
1 parent f676863 commit 3be326d
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,27 @@
package org.icij.datashare.db;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.icij.datashare.asynctasks.Group;
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.asynctasks.TaskRepository;
import org.icij.datashare.db.tables.records.TaskRecord;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;

import javax.sql.DataSource;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Optional.ofNullable;
import static org.icij.datashare.asynctasks.bus.amqp.Event.MAX_RETRIES_LEFT;
import static org.icij.datashare.db.Tables.TASK;

public class JooqTaskRepository implements TaskRepository {
private final DataSource connectionProvider;
private final SQLDialect dialect;
Expand Down Expand Up @@ -41,12 +53,31 @@ public boolean containsValue(Object o) {

@Override
public Task<?> get(Object o) {
return null;
return createTaskFrom(DSL.using(connectionProvider, dialect).selectFrom(TASK).
where(TASK.ID.eq((String) o)).fetchOne());
}

@Override
public Task<?> put(String s, Task<?> task) {
return null;
try {
if (task == null || s == null || !s.equals(task.getId())) {
throw new IllegalArgumentException(String.format("task is null or its id (%s) is different than the key (%s)", ofNullable(task).map(Task::getId).orElse(null), s));
}
final int inserted = DSL.using(connectionProvider, dialect)
.insertInto(TASK).columns(
TASK.ID, TASK.NAME, TASK.STATE, TASK.USER_ID, TASK.GROUP_ID, TASK.PROGRESS,
TASK.CREATED_AT, TASK.RETRIES_LEFT, TASK.MAX_RETRIES, TASK.ARGS)
.values(task.id, task.name,
task.getState().name(),
ofNullable(task.getUser()).map(u -> u.id).orElse(null),
ofNullable(task.getGroup()).map(Group::id).orElse(null),
task.getProgress(),
new Timestamp(task.createdAt.getTime()).toLocalDateTime(), task.getRetriesLeft(),
MAX_RETRIES_LEFT, new ObjectMapper().writeValueAsString(task.args)).execute();
return inserted == 1 ? task : null;
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down Expand Up @@ -78,4 +109,16 @@ public Collection<Task<?>> values() {
public Set<Entry<String, Task<?>>> entrySet() {
return Set.of();
}

private Task<?> createTaskFrom(TaskRecord taskRecord) {
return ofNullable(taskRecord).map(r ->
{
try {
return new Task<>(r.getId(), r.getName(), Task.State.valueOf(r.getState()),
r.getProgress(), null, new ObjectMapper().readValue(r.getArgs(), new TypeReference<HashMap<String, Object>>(){}));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}).orElse(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
databaseChangeLog:
- changeSet:
id: 36
author: bthomas
changes:
- createTable:
tableName: task
columns:
- column:
name: id
type: varchar(96)
constraints:
primaryKey: true
- column:
name: name
type: varchar(128)
constraints:
nullable: false
- column:
name: state
type: varchar(16)
constraints:
nullable: false
- column:
name: user_id
type: varchar(96)
- column:
name: group_id
type: varchar(128)
- column:
name: progress
type: float
defaultValue: 0
- column:
name: created_at
type: datetime
constraints:
nullable: false
- column:
name: completed_at
type: datetime
- column:
name: retries_left
type: int
- column:
name: max_retries
type: int
- column:
name: args
type: text

- createIndex:
indexName: task_name
tableName: task
columns:
- column:
name: name
type: varchar(128)
- createIndex:
indexName: task_created_at
tableName: task
columns:
- column:
name: created_at
type: datetime
- createIndex:
indexName: task_state
tableName: task
columns:
- column:
name: state
type: varchar(16)
- createIndex:
indexName: task_user_id
tableName: task
columns:
- column:
name: user_id
type: varchar(96)
- createIndex:
indexName: task_group
tableName: task
columns:
- column:
name: group_id
type: varchar(128)
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,7 @@ databaseChangeLog:
relativeToChangelogFile: true
- include:
file: changes/035-batch-search-rename-tag-column-to-query-template.yml
relativeToChangelogFile: true
- include:
file: changes/036-create_task.yml
relativeToChangelogFile: true
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import org.icij.datashare.asynctasks.Task;
import org.icij.datashare.test.DatashareTimeRule;
import org.icij.datashare.user.User;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -23,13 +22,29 @@ public class JooqTaskRepositoryTest {
public DbSetupRule dbRule;
private final JooqTaskRepository repository;

@Ignore
@Test(expected = IllegalArgumentException.class)
public void test_put_with_key_different_than_id() {
assertThat(repository.put("my_key", new Task<>("foo", User.local(), Map.of())));
}

@Test(expected = IllegalArgumentException.class)
public void test_put_with_id_null() {
assertThat(repository.put("my_key", new Task<>(null, "foo", User.local(), Map.of())));
}

@Test(expected = IllegalArgumentException.class)
public void test_put_with_null_key() {
assertThat(repository.put(null, new Task<>("foo", User.local(), Map.of())));
}

@Test
public void test_get_put() {
public void test_put_get() {
Task<Object> foo = new Task<>("foo", User.local(), Map.of());
repository.put("foo", foo);
assertThat(repository.get("foo")).isNotSameAs(foo); // not same instance
assertThat(repository.get("foo")).isEqualTo(foo); // but equals as defined by Task

assertThat(repository.put(foo.getId(), foo)).isSameAs(foo);

assertThat(repository.get(foo.getId())).isNotSameAs(foo); // not same instance
assertThat(repository.get(foo.getId())).isEqualTo(foo); // but equals as defined by Task
}

@Parameterized.Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.icij.datashare.batch.WebQueryPagination;
import org.icij.datashare.user.User;

import java.io.Serial;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -29,7 +28,6 @@
import static java.util.Optional.ofNullable;
import static java.util.UUID.randomUUID;
import static org.icij.datashare.batch.WebQueryPagination.OrderDirection.ASC;
import static org.icij.datashare.batch.WebQueryPagination.OrderDirection.DESC;

@JsonInclude(JsonInclude.Include.NON_NULL)
public class Task<V> extends Event implements Entity, Comparable<Task<V>> {
Expand Down Expand Up @@ -75,7 +73,7 @@ public Task(String id, String name, User user, Map<String, Object> args) {
}

@JsonCreator
Task(@JsonProperty("id") String id,
public Task(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@JsonProperty("state") State state,
@JsonProperty("progress") double progress,
Expand Down Expand Up @@ -165,6 +163,8 @@ public State getState() {
return state;
}

public int getRetriesLeft() {return retriesLeft;}

@JsonIgnore
public boolean isFinished() {
return State.DONE.equals(state) || State.CANCELLED.equals(state) || State.ERROR.equals(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,11 @@
import java.util.Map;

public interface TaskRepository extends Map<String, Task<?>> {
default Task<?> save(Task<?> task) {
return put(task.getId(), task);
}

default Task<?> get(String id) {
return get((Object)id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
@JsonSubTypes.Type(ErrorEvent.class)})
public class Event implements Serializable {
@Serial private static final long serialVersionUID = -2295266944323500399L;
protected int retriesLeft = 3;
public static final int MAX_RETRIES_LEFT = 3;
protected int retriesLeft = MAX_RETRIES_LEFT;
public final Date createdAt;

public Event() {
Expand Down

0 comments on commit 3be326d

Please sign in to comment.