Skip to content

Commit

Permalink
[INLONG-11544][Manager] Optimize the configuration of the Manager sch…
Browse files Browse the repository at this point in the history
…edule module
  • Loading branch information
ZKpLo committed Nov 26, 2024
1 parent ace3362 commit 5906bd5
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private void initConnection() throws Exception {
new AirflowConnectionGetter(airflowConfig.getConnectionId()));
if (!response.isSuccess()) {
AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
airflowConfig.getPort(), airflowConfig.getInlongPassword(), "");
airflowConfig.getInlongManagerHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
airflowConfig.getInlongManagerPort(), airflowConfig.getInlongPassword(), "");
response = serverClient.sendRequest(new AirflowConnectionCreator(newConn));
LOGGER.info("AirflowConnection registration response: {}", response.toString());
if (!response.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,30 @@
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import okhttp3.OkHttpClient;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

import java.net.URL;

@Data
@Configuration
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public class AirflowConfig extends ClientConfiguration {

@Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
private String host;
private static final Logger LOGGER = LoggerFactory.getLogger(AirflowConfig.class);
@Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083/}")
private String inlongManagerUrl;

@Value("${server.port:8083}")
private int port;
private String inlongManagerHost;
private int inlongManagerPort;

@Value("${default.admin.user:admin}")
private String inlongUsername;
Expand All @@ -68,6 +76,23 @@ public class AirflowConfig extends ClientConfiguration {
@Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}")
private String baseUrl;

@PostConstruct
public void init() {
try {
if (StringUtil.isNotBlank(inlongManagerUrl)) {
URL url = new URL(inlongManagerUrl);
this.inlongManagerHost = url.getHost();
this.inlongManagerPort = url.getPort();
if (this.inlongManagerPort == -1) {
this.inlongManagerPort = 8083;
}
}
LOGGER.info("Init AirflowConfig success for manager url ={}", this.inlongManagerUrl);
} catch (Exception e) {
LOGGER.error("Init AirflowConfig failed for manager url={}: ", this.inlongManagerUrl, e);
}
}

@Bean
public OkHttpClient okHttpClient() {
return new OkHttpClient.Builder()
Expand All @@ -79,6 +104,7 @@ public OkHttpClient okHttpClient() {
.retryOnConnectionFailure(true)
.build();
}

@Bean
public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) {
return new AirflowServerClient(okHttpClient, airflowConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ public class DolphinScheduleEngine implements ScheduleEngine {

private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class);

@Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
private String host;

@Value("${server.port:8083}")
private int port;
@Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083/}")
private String inlongManagerUrl;

@Value("${default.admin.user:admin}")
private String username;
Expand All @@ -86,10 +83,10 @@ public void init() {
this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
}

public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl,
public DolphinScheduleEngine(String inlongManagerUrl, String username, String password,
String dolphinUrl,
String token) {
this.host = host;
this.port = port;
this.inlongManagerUrl = inlongManagerUrl;
this.username = username;
this.password = password;
this.dolphinUrl = dolphinUrl;
Expand Down Expand Up @@ -161,8 +158,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
processDefCode =
dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode,
host, port,
username, password, offset, scheduleInfo.getInlongGroupId());
inlongManagerUrl, username, password, offset, scheduleInfo.getInlongGroupId());
LOGGER.info("Create process definition success, process definition code: {}", processDefCode);

if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public long genTaskCode(String url, String token) {
/**
* Creates a process definition in DolphinScheduler.
*/
public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host,
int port, String username, String password, long offset, String groupId) {
public long createProcessDef(String url, String token, String name, String desc, long taskCode,
String inlongManagerUrl, String username, String password, long offset, String groupId) {
try {
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host,
port, username, password, offset, groupId);
return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, inlongManagerUrl, username,
password, offset, groupId);
} catch (Exception e) {
LOGGER.error("Unexpected wrong in creating process definition: ", e);
throw new DolphinScheduleException(UNEXPECTED_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,22 +282,21 @@ public static long genTaskCode(String url, String token) {
/**
* Creates a process definition in DolphinScheduler.
*
* @param url The base URL of the DolphinScheduler API.
* @param token The authentication token to be used in the request header.
* @param name The name of the process definition.
* @param desc The description of the process definition.
* @param taskCode The task code to be associated with this process definition.
* @param host The host where the process will run.
* @param port The port where the process will run.
* @param username The username for authentication.
* @param password The password for authentication.
* @param offset The offset for the scheduling.
* @param groupId The group ID of the process.
* @param url The base URL of the DolphinScheduler API.
* @param token The authentication token to be used in the request header.
* @param name The name of the process definition.
* @param desc The description of the process definition.
* @param taskCode The task code to be associated with this process definition.
* @param inlongManagerUrl The host where the process will run.
* @param username The username for authentication.
* @param password The password for authentication.
* @param offset The offset for the scheduling.
* @param groupId The group ID of the process.
* @return The process definition code (ID) if creation is successful, or 0 if an error occurs.
*/
public static long createProcessDef(String url, String token, String name, String desc,
long taskCode, String host,
int port, String username, String password, long offset, String groupId) throws Exception {
long taskCode, String inlongManagerUrl, String username, String password, long offset, String groupId)
throws Exception {
try {
Map<String, String> header = buildHeader(token);

Expand All @@ -306,7 +305,7 @@ public static long createProcessDef(String url, String token, String name, Strin
String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation));

DSTaskParams taskParams = new DSTaskParams();
taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId));
taskParams.setRawScript(buildScript(inlongManagerUrl, username, password, offset, groupId));

DSTaskDefinition taskDefinition = new DSTaskDefinition();
taskDefinition.setCode(taskCode);
Expand Down Expand Up @@ -774,10 +773,10 @@ private static JsonObject executeHttpRequest(String url, String method, Map<Stri
* When process definition schedule run, the shell node run,
* Call back in inlong, sending a request with parameters required
*/
private static String buildScript(String host, int port, String username, String password, long offset,
private static String buildScript(String inlongManagerUrl, String username, String password, long offset,
String groupId) {
LOGGER.info("build script for host: {}, port: {}, username: {}, password: {}, offset: {}, groupId: {}", host,
port, username, password, offset, groupId);
LOGGER.info("build script for Inlong Manager Url: {}, username: {}, password: {}, offset: {}, groupId: {}",
inlongManagerUrl, username, password, offset, groupId);
return "#!/bin/bash\n\n" +

// Get current timestamp
Expand All @@ -789,7 +788,7 @@ private static String buildScript(String host, int port, String username, String

// Set URL
"# Set URL and HTTP method\n" +
"url=\"http://" + host + ":" + port + SHELL_REQUEST_API +
"url=\"" + inlongManagerUrl + SHELL_REQUEST_API +
"?username=" + username + "&password=" + password + "\"\n" +
"echo \"get url: ${url}\"\n" +

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://127.0.0.1:8083/

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://127.0.0.1:8083/

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ dirty.log.clean.interval.minutes=5
dirty.log.retention.minutes=10
dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg

# Please confirm it is the actual address of manager
schedule.engine.inlong.manager.host=
# Inlong Manager URL accessible by the scheduler
schedule.engine.inlong.manager.url=http://127.0.0.1:8083/

# DolphinScheduler related config
schedule.engine.dolphinscheduler.url=
Expand Down

0 comments on commit 5906bd5

Please sign in to comment.