Skip to content

Commit

Permalink
chore: wirering TaskRepository in common mode #1626
Browse files Browse the repository at this point in the history
  • Loading branch information
bamthomas committed Jan 17, 2025
1 parent f9aceee commit bc52d03
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.TaskModifier;
import org.icij.datashare.asynctasks.TaskRepository;
import org.icij.datashare.cli.TaskRepositoryType;
import org.icij.datashare.db.JooqTaskRepository;
import org.icij.datashare.tasks.TaskRepositoryRedis;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.batch.BatchSearchRepository;
Expand Down Expand Up @@ -79,6 +81,7 @@
import static org.icij.datashare.cli.DatashareCliOptions.BATCH_QUEUE_TYPE_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.MODE_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.QUEUE_TYPE_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.TASK_REPOSITORY_OPT;
import static org.icij.datashare.text.indexing.elasticsearch.ElasticsearchConfiguration.createESClient;

public abstract class CommonMode extends AbstractModule implements Closeable {
Expand Down Expand Up @@ -140,13 +143,11 @@ protected void configure() {
QueueType batchQueueType = getQueueType(propertiesProvider, BATCH_QUEUE_TYPE_OPT, QueueType.MEMORY);
switch ( batchQueueType ) {
case REDIS:
bind(TaskRepository.class).to(TaskRepositoryRedis.class);
bind(TaskManager.class).to(TaskManagerRedis.class);
bind(TaskModifier.class).to(TaskSupplierRedis.class);
bind(TaskSupplier.class).to(TaskSupplierRedis.class);
break;
case AMQP:
bind(TaskRepository.class).to(TaskRepositoryRedis.class);
bind(TaskManager.class).to(TaskManagerAmqp.class);
bind(TaskSupplier.class).to(TaskSupplierAmqp.class);
bind(TaskModifier.class).to(TaskSupplierAmqp.class);
Expand Down Expand Up @@ -268,6 +269,16 @@ void configurePersistence() {
bind(Repository.class).toInstance(repositoryFactory.createRepository());
bind(ApiKeyRepository.class).toInstance(repositoryFactory.createApiKeyRepository());
bind(BatchSearchRepository.class).toInstance(repositoryFactory.createBatchSearchRepository());

TaskRepositoryType taskRepositoryType = TaskRepositoryType.valueOf(propertiesProvider.get(TASK_REPOSITORY_OPT).orElse("REDIS"));
switch ( taskRepositoryType ) {
case REDIS -> {
bind(TaskRepository.class).to(TaskRepositoryRedis.class);
}
case DATABASE -> {
bind(TaskRepository.class).toInstance(new JooqTaskRepository(repositoryFactory.getDataSource(), repositoryFactory.guessSqlDialect()));
}
}
repositoryFactory.initDatabase();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ OptionParser createParser() {
DatashareCliOptions.taskRoutingStrategy(parser);
DatashareCliOptions.taskRoutingKey(parser);
DatashareCliOptions.pollingInterval(parser);
DatashareCliOptions.taskRepositoryType(parser);
return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public final class DatashareCliOptions {
public static final String TASK_ROUTING_KEY_OPT = "taskRoutingKey";
public static final String OAUTH_USER_PROJECTS_KEY_OPT = "oauthUserProjectsAttribute";
public static final String POLLING_INTERVAL_OPT = "pollingInterval";

public static final String TASK_REPOSITORY_OPT = "taskRepositoryType";

private static final Path DEFAULT_DATASHARE_HOME = Paths.get(System.getProperty("user.home"), ".local/share/datashare");
private static final Integer DEFAULT_NLP_PARALLELISM = 1;
Expand Down Expand Up @@ -836,6 +836,15 @@ public static void pollingInterval(OptionParser parser) {
.ofType(String.class).defaultsTo("60");
}

public static void taskRepositoryType(OptionParser parser) {
parser.acceptsAll(
singletonList(TASK_REPOSITORY_OPT), "type of task repository")
.withRequiredArg()
.ofType( TaskRepositoryType.class )
.defaultsTo(TaskRepositoryType.REDIS);
}


public static ValueConverter<String> toAbsolute() {
return new ValueConverter<String>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.icij.datashare.cli;

public enum TaskRepositoryType {
REDIS, DATABASE
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class JooqTaskRepository implements TaskRepository {
private final DataSource connectionProvider;
private final SQLDialect dialect;

JooqTaskRepository(final DataSource connectionProvider, final SQLDialect dialect) {
public JooqTaskRepository(final DataSource connectionProvider, final SQLDialect dialect) {
this.connectionProvider = connectionProvider;
this.dialect = dialect;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private <T> T createRepository(BiFunction<DataSource, SQLDialect, T> constructor
return constructor.apply(dataSource, guessSqlDialectFrom(getDataSourceUrl()));
}

static SQLDialect guessSqlDialectFrom(String dataSourceUrl) {
public static SQLDialect guessSqlDialectFrom(String dataSourceUrl) {
for (SQLDialect dialect: SQLDialect.values()) {
if (dataSourceUrl.contains(dialect.name().toLowerCase())) {
return dialect;
Expand All @@ -87,6 +87,9 @@ static SQLDialect guessSqlDialectFrom(String dataSourceUrl) {
throw new IllegalArgumentException("unknown SQL dialect for datasource : " + dataSourceUrl);
}

public DataSource getDataSource() {return dataSource;}
public SQLDialect guessSqlDialect() {return guessSqlDialectFrom(getDataSourceUrl());}

DataSource createDatasource() {
HikariConfig config = new HikariConfig();
String dataSourceUrl = getDataSourceUrl();
Expand Down

0 comments on commit bc52d03

Please sign in to comment.