Skip to content

Commit

Permalink
[INLONG-11624][Agent] Add SQL data source (#11625)
Browse files Browse the repository at this point in the history
* [INLONG-11624][Agent] Add SQL data source

* [INLONG-11624][Agent] Add code comments

* Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/SQLInstance.java

Co-authored-by: AloysZhang <[email protected]>

* Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java

Co-authored-by: AloysZhang <[email protected]>

* Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java

Co-authored-by: AloysZhang <[email protected]>

* Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/FileTask.java

Co-authored-by: AloysZhang <[email protected]>

* Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/FileTask.java

Co-authored-by: AloysZhang <[email protected]>

* [INLONG-11624][Agent] Modify code based on comments

* Update inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/SQLSource.java

Co-authored-by: fuweng11 <[email protected]>

* [INLONG-11624][Agent] Modify thread name

* [INLONG-11624][Agent] Rename variables to prevent misunderstandings

---------

Co-authored-by: AloysZhang <[email protected]>
Co-authored-by: fuweng11 <[email protected]>
  • Loading branch information
3 people authored Dec 30, 2024
1 parent cd2a0a2 commit b91a380
Show file tree
Hide file tree
Showing 20 changed files with 1,166 additions and 849 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,30 @@
*/
public class InstanceProfile extends AbstractConfiguration implements Comparable<InstanceProfile> {

public static final String DEFAULT_FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance";
public static final String DEFAULT_COS_INSTANCE = "org.apache.inlong.agent.plugin.instance.COSInstance";
public static final String DEFAULT_KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance";
public static final String DEFAULT_MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance";
public static final String DEFAULT_MQTT_INSTANCE = "org.apache.inlong.agent.plugin.instance.MqttInstance";
public static final String DEFAULT_ORACLE_INSTANCE = "org.apache.inlong.agent.plugin.instance.OracleInstance";
public static final String DEFAULT_POSTGRES_INSTANCE = "org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
public static final String DEFAULT_PULSAR_INSTANCE = "org.apache.inlong.agent.plugin.instance.PulsarInstance";
public static final String DEFAULT_REDIS_INSTANCE = "org.apache.inlong.agent.plugin.instance.RedisInstance";
public static final String DEFAULT_SQLSERVER_INSTANCE = "org.apache.inlong.agent.plugin.instance.SQLServerInstance";
public static final String FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance";
public static final String COS_INSTANCE = "org.apache.inlong.agent.plugin.instance.COSInstance";
public static final String KAFKA_INSTANCE = "org.apache.inlong.agent.plugin.instance.KafkaInstance";
public static final String MONGODB_INSTANCE = "org.apache.inlong.agent.plugin.instance.MongoDBInstance";
public static final String MQTT_INSTANCE = "org.apache.inlong.agent.plugin.instance.MqttInstance";
public static final String ORACLE_INSTANCE = "org.apache.inlong.agent.plugin.instance.OracleInstance";
public static final String POSTGRES_INSTANCE = "org.apache.inlong.agent.plugin.instance.PostgreSQLInstance";
public static final String PULSAR_INSTANCE = "org.apache.inlong.agent.plugin.instance.PulsarInstance";
public static final String REDIS_INSTANCE = "org.apache.inlong.agent.plugin.instance.RedisInstance";
public static final String SQLSERVER_INSTANCE = "org.apache.inlong.agent.plugin.instance.SQLServerInstance";
public static final String SQL_INSTANCE = "org.apache.inlong.agent.plugin.instance.SQLInstance";

public static final String FILE_SOURCE = "org.apache.inlong.agent.plugin.sources.LogFileSource";
public static final String BINLOG_SOURCE = "org.apache.inlong.agent.plugin.sources.BinlogSource";
public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource";
public static final String PULSAR_SOURCE = "org.apache.inlong.agent.plugin.sources.PulsarSource";
public static final String POSTGRESQL_SOURCE = "org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource";
public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource";
public static final String REDIS_SOURCE = "org.apache.inlong.agent.plugin.sources.RedisSource";
public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource";
public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SQLServerSource";
public static final String COS_SOURCE = "org.apache.inlong.agent.plugin.sources.COSSource";
public static final String SQL_SOURCE = "org.apache.inlong.agent.plugin.sources.SQLSource";

private static final Logger LOGGER = LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();
Expand Down Expand Up @@ -88,25 +102,27 @@ public static String getInstanceClassByTaskType(TaskTypeEnum taskType) {
}
switch (taskType) {
case FILE:
return DEFAULT_FILE_INSTANCE;
return FILE_INSTANCE;
case KAFKA:
return DEFAULT_KAFKA_INSTANCE;
return KAFKA_INSTANCE;
case PULSAR:
return DEFAULT_PULSAR_INSTANCE;
return PULSAR_INSTANCE;
case POSTGRES:
return DEFAULT_POSTGRES_INSTANCE;
return POSTGRES_INSTANCE;
case ORACLE:
return DEFAULT_ORACLE_INSTANCE;
return ORACLE_INSTANCE;
case SQLSERVER:
return DEFAULT_SQLSERVER_INSTANCE;
return SQLSERVER_INSTANCE;
case MONGODB:
return DEFAULT_MONGODB_INSTANCE;
return MONGODB_INSTANCE;
case REDIS:
return DEFAULT_REDIS_INSTANCE;
return REDIS_INSTANCE;
case MQTT:
return DEFAULT_MQTT_INSTANCE;
return MQTT_INSTANCE;
case COS:
return DEFAULT_COS_INSTANCE;
return COS_INSTANCE;
case SQL:
return SQL_INSTANCE;
default:
LOGGER.error("invalid task type {}", taskType);
return null;
Expand All @@ -126,7 +142,43 @@ public String getCycleUnit() {
}

public String getSourceClass() {
return get(TaskConstants.TASK_SOURCE);
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, TaskTypeEnum.FILE.getType()));
return getSourceClassByTaskType(taskType);
}

public static String getSourceClassByTaskType(TaskTypeEnum taskType) {
if (taskType == null) {
return null;
}
switch (taskType) {
case BINLOG:
return BINLOG_SOURCE;
case FILE:
return FILE_SOURCE;
case KAFKA:
return KAFKA_SOURCE;
case PULSAR:
return PULSAR_SOURCE;
case POSTGRES:
return POSTGRESQL_SOURCE;
case ORACLE:
return ORACLE_SOURCE;
case SQLSERVER:
return SQLSERVER_SOURCE;
case MONGODB:
return MONGO_SOURCE;
case REDIS:
return REDIS_SOURCE;
case MQTT:
return MQTT_SOURCE;
case COS:
return COS_SOURCE;
case SQL:
return SQL_SOURCE;
default:
LOGGER.error("invalid task type {}", taskType);
return null;
}
}

public String getSinkClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,18 @@
*/
public class TaskProfile extends AbstractConfiguration {

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
public static final String DEFAULT_COS_TASK = "org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String DEFAULT_REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String DEFAULT_MOCK_TASK = "org.apache.inlong.agent.plugin.task.MockTask";
public static final String SQL_TASK = "org.apache.inlong.agent.plugin.task.logcollection.SQLTask";
public static final String FILE_TASK = "org.apache.inlong.agent.plugin.task.logcollection.local.FileTask";
public static final String COS_TASK = "org.apache.inlong.agent.plugin.task.logcollection.cos.COSTask";
public static final String KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String ORACLE_TASK = "org.apache.inlong.agent.plugin.task.OracleTask";
public static final String REDIS_TASK = "org.apache.inlong.agent.plugin.task.RedisTask";
public static final String POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String SQLSERVER_TASK = "org.apache.inlong.agent.plugin.task.SQLServerTask";
public static final String MOCK_TASK = "org.apache.inlong.agent.plugin.task.MockTask";

private static final Gson GSON = new Gson();
private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class);
Expand All @@ -75,28 +76,30 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
public String getTaskClass() {
TaskTypeEnum taskType = TaskTypeEnum.getTaskType(getInt(TASK_TYPE, TaskTypeEnum.FILE.getType()));
switch (requireNonNull(taskType)) {
case SQL:
return SQL_TASK;
case FILE:
return DEFAULT_FILE_TASK;
return FILE_TASK;
case KAFKA:
return DEFAULT_KAFKA_TASK;
return KAFKA_TASK;
case PULSAR:
return DEFAULT_PULSAR_TASK;
return PULSAR_TASK;
case POSTGRES:
return DEFAULT_POSTGRESQL_TASK;
return POSTGRESQL_TASK;
case ORACLE:
return DEFAULT_ORACLE_TASK;
return ORACLE_TASK;
case SQLSERVER:
return DEFAULT_SQLSERVER_TASK;
return SQLSERVER_TASK;
case MONGODB:
return DEFAULT_MONGODB_TASK;
return MONGODB_TASK;
case REDIS:
return DEFAULT_REDIS_TASK;
return REDIS_TASK;
case MQTT:
return DEFAULT_MQTT_TASK;
return MQTT_TASK;
case COS:
return DEFAULT_COS_TASK;
return COS_TASK;
case MOCK:
return DEFAULT_MOCK_TASK;
return MOCK_TASK;
default:
logger.error("invalid task type {}", taskType);
return null;
Expand Down Expand Up @@ -153,7 +156,7 @@ public static TaskProfile parseJsonStr(String jsonStr) {
*/
@Override
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.TASK_ID) && hasKey(TaskConstants.TASK_SOURCE)
return hasKey(TaskConstants.TASK_ID)
&& hasKey(TaskConstants.TASK_SINK) && hasKey(TaskConstants.TASK_CHANNEL)
&& hasKey(TaskConstants.TASK_GROUP_ID) && hasKey(TaskConstants.TASK_STREAM_ID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_GROUP_ID = "task.groupId";
public static final String TASK_STREAM_ID = "task.streamId";
public static final String RESTORE_FROM_DB = "task.restoreFromDB";

public static final String TASK_SOURCE = "task.source";

public static final String TASK_CHANNEL = "task.channel";
public static final String TASK_TYPE = "task.taskType";
public static final String TASK_FILE_TRIGGER = "task.fileTask.trigger";
Expand Down Expand Up @@ -75,7 +72,7 @@ public class TaskConstants extends CommonConstants {
public static final String COS_CONTENT_STYLE = "task.cosTask.contentStyle";
public static final String COS_MAX_NUM = "task.cosTask.maxFileCount";
public static final String COS_TASK_PATTERN = "task.cosTask.pattern";
public static final String TASK_COS_TIME_OFFSET = "task.cosTask.timeOffset";
public static final String COS_TIME_OFFSET = "task.cosTask.timeOffset";
public static final String COS_TASK_RETRY = "task.cosTask.retry";
public static final String COS_TASK_TIME_FROM = "task.cosTask.dataTimeFrom";
public static final String COS_TASK_TIME_TO = "task.cosTask.dataTimeTo";
Expand All @@ -86,6 +83,20 @@ public class TaskConstants extends CommonConstants {
public static final String COS_DATA_SEPARATOR = "task.cosTask.dataSeparator";
public static final String COS_FILTER_STREAMS = "task.cosTask.filterStreams";

// SQL task
public static final String SQL_TASK_CYCLE_UNIT = "task.sqlTask.cycleUnit";
public static final String SQL_MAX_NUM = "task.sqlTask.maxInstanceCount";
public static final String SQL_TASK_SQL = "task.sqlTask.sql";
public static final String SQL_TIME_OFFSET = "task.sqlTask.timeOffset";
public static final String SQL_TASK_RETRY = "task.sqlTask.retry";
public static final String SQL_TASK_TIME_FROM = "task.sqlTask.dataTimeFrom";
public static final String SQL_TASK_TIME_TO = "task.sqlTask.dataTimeTo";
public static final String SQL_TASK_JDBC_URL = "task.sqlTask.jdbcUrl";
public static final String SQL_TASK_USERNAME = "task.sqlTask.username";
public static final String SQL_TASK_PASSWORD = "task.sqlTask.jdbcPassword";
public static final String SQL_TASK_DATA_SEPARATOR = "task.sqlTask.dataSeparator";
public static final String SQL_TASK_FETCH_SIZE = "task.sqlTask.fetchSize";

/**
* delimiter to split offset for different task
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.pojo;

import lombok.Data;

@Data
public class SQLTask {

private Integer id;
private String sql;
private String cycleUnit;
private Boolean retry;
private String dataTimeFrom;
private String dataTimeTo;
private String timeOffset;
// The number of instances that can run simultaneously for this task to
// prevent other problems caused by running too many instances simultaneously
private Integer maxInstanceCount;
private String jdbcUrl;
private String username;
private String jdbcPassword;
private String dataSeparator;
// The number of rows collected from the data source in each batch
private Integer fetchSize;

@Data
public static class SQLTaskConfig {

private String sql;
private String cycleUnit;
private Boolean retry;
private String dataTimeFrom;
private String dataTimeTo;
// '1m' means one minute after, '-1m' means one minute before
// '1h' means one hour after, '-1h' means one hour before
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
private Integer maxInstanceCount;
private String jdbcUrl;
private String username;
private String jdbcPassword;
private String dataSeparator;
private Integer fetchSize;
}
}
Loading

0 comments on commit b91a380

Please sign in to comment.