Skip to content

Commit

Permalink
[INLONG-11695][SDK] MessageSender related interfaces abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang committed Jan 21, 2025
1 parent b10872e commit 9a7b106
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.network;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;

import java.util.concurrent.ConcurrentHashMap;

/**
* Client Manager interface
*
* Used to Manager network client
*/
public interface ClientMgr {

/**
* Start network client manager
*
* @param procResult the start result, return detail error infos if sending fails
* @return true if successful, false return indicates failure.
*/
boolean start(ProcessResult procResult);

/**
* Stop network client manager
*
*/
void stop();

/**
* Get the number of proxy nodes currently in use
*
* @return Number of nodes in use
*/
int getActiveNodeCnt();

/**
* Get the number of in-flight messages
*
* @return Number of in-flight messages
*/
int getInflightMsgCnt();

/**
* Update cached proxy nodes
*
* @param nodeChanged whether the updated node has changed
* @param hostInfoMap the new proxy nodes
*/
void updateProxyInfoList(boolean nodeChanged, ConcurrentHashMap<String, HostInfo> hostInfoMap);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;

/**
* Message Sender interface
*
* Used to define the sender common methods
*/
public interface MessageSender {

/**
* Start sender when the sender is built
*
* <p>Attention:
* if return false, the caller need to handle it based on the error code and
* error information returned by procResult, such as:
* prompting the user, retrying after some time, etc.
* </p>
*
* @param procResult the start result, return detail error infos if sending fails
* @return true if successful, false return indicates that the sender fails to start.
*/
boolean start(ProcessResult procResult);

/**
* Close the sender when need to stop the sender's sending service.
*/
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;

/**
* Message Send Callback interface
*
* Used to define the send callback methods
*/
public interface MsgSendCallback {

/**
* Invoked when a message is confirmed by DataProxy
*
* @param result The event process result, include detail error infos if sending fails
*/
void onMessageAck(ProcessResult result);

/**
* Invoked when a message transportation interrupted by an exception
*
* @param ex The exception info
*/
void onException(Throwable ex);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender.http;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;

/**
* HTTP Message Sender interface
*
* Used to define the HTTP message sender methods
*/
public interface HttpMsgSender extends MessageSender {

/**
* Synchronously send message and wait for the final sending result
*
* <p>Attention: if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.</p>
*
* @param eventInfo the event information need to send
* @param procResult The send result, including the detail error infos if failed
* @return true if successful, false if failed for some reason.
*/
boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult procResult);

/**
* Asynchronously send message
*
* <p>Attention: if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.</p>
*
* @param eventInfo the event information need to send
* @param callback the callback that returns the response from DataProxy or
* an exception that occurred while waiting for the response.
* @param procResult The send result, including the detail error infos if the event not accepted
* @return true if successful, false if the event not accepted for some reason.
*/
boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender.tcp;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.MessageSender;
import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;

/**
* TCP Message Sender interface
*
* Used to define the TCP message sender methods
*/
public interface TcpMsgSender extends MessageSender {

/**
* Send message without response
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.
* 2. this method may lose messages. It is suitable for situations where the reporting volume is very large,
* the business does not pay attention to the final reporting results, and
* the message loss is tolerated in the event of an exception.
* </p>
*
* @param eventInfo the event information need to send
* @param procResult The send result, include the detail error infos if the eventInfo is not accepted
* @return true if successful, false return indicates that the eventInfo was not accepted for some reason.
*/
boolean sendMessageWithoutAck(TcpEventInfo eventInfo, ProcessResult procResult);

/**
* Synchronously send message and wait for the final sending result
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.
* 2. this method, with sendInB2B = true, tries to ensure that messages are delivered, but there
* may be duplicate messages or message loss scenarios. It is suitable for scenarios with
* a very large number of reports, very low reporting time requirements, and
* the need to return the sending results.
* 3. this method, with sendInB2B = false, ensures that the message is delivered only once and
* will not be repeated. It is suitable for businesses with a small amount of reports and
* no requirements on the reporting time, but require DataProxy to forward messages with high reliability.
* </p>
*
* @param sendInB2B indicates the DataProxy message service mode, true indicates DataProxy returns
* as soon as it receives the request and forwards the message in B2B mode until it succeeds;
* false indicates DataProxy returns after receiving the request and forwarding it successfully,
* and DataProxy does not retry on failure
* @param eventInfo the event information need to send
* @param procResult The send result, including the detail error infos if failed
* @return true if successful, false if failed for some reason.
*/
boolean syncSendMessage(boolean sendInB2B,
TcpEventInfo eventInfo, ProcessResult procResult);

/**
* Asynchronously send message
*
* <p>Attention:
* 1. if return false, the caller can choose to wait for a period of time before trying again, or
* discard the event after multiple retries and failures.
* 2. this method, with sendInB2B = true, tries to ensure that messages are delivered, but there
* may be duplicate messages or message loss scenarios. It is suitable for scenarios with
* a very large number of reports, very low reporting time requirements, and
* the need to return the sending results.
* 3. this method, with sendInB2B = false, ensures that the message is delivered only once and
* will not be repeated. It is suitable for businesses with a small amount of reports and
* no requirements on the reporting time, but require DataProxy to forward messages with high reliability.
* </p>
*
* @param sendInB2B indicates the DataProxy message service mode, true indicates DataProxy returns
* as soon as it receives the request and forwards the message in B2B mode until it succeeds;
* false indicates DataProxy returns after receiving the request and forwarding it successfully,
* and DataProxy does not retry on failure
* @param eventInfo the event information need to send
* @param callback the callback that returns the response from DataProxy or
* an exception that occurred while waiting for the response.
* @param procResult The send result, including the detail error infos if the event not accepted
* @return true if successful, false if the event not accepted for some reason.
*/
boolean asyncSendMessage(boolean sendInB2B,
TcpEventInfo eventInfo, MsgSendCallback callback, ProcessResult procResult);
}

0 comments on commit 9a7b106

Please sign in to comment.