diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java index daec1c0694e..22a54638962 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java @@ -88,7 +88,6 @@ public void open(Configuration configuration) throws Exception { converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType()); fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType()); - log.info("inlong sdk dirty options={}", options); // init sender dirtySender = InlongSdkDirtySender.builder() .inlongManagerAddr(options.getInlongManagerAddr()) diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java index 053aa583648..970775459d3 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java @@ -19,8 +19,10 @@ import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory; +import org.apache.inlong.sort.base.dirty.utils.AESUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -28,12 +30,14 @@ import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Set; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; +import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE; import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES; @@ -86,11 +90,16 @@ public class InlongSdkDirtySinkFactory implements DirtySinkFactory { @Override public DirtySink createDirtySink(DynamicTableFactory.Context context) { - ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); - FactoryUtil.validateFactoryOptions(this, config); - InlongSdkDirtyOptions options = getOptions(config); - return new InlongSdkDirtySink<>(options, - context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); + try { + ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); + FactoryUtil.validateFactoryOptions(this, config); + InlongSdkDirtyOptions options = getOptions(config); + return new InlongSdkDirtySink<>(options, + context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); + } catch (Throwable t) { + log.warn("failed to create dirty sink", t); + return null; + } } private InlongSdkDirtyOptions getOptions(ReadableConfig config) { @@ -100,7 +109,9 @@ private InlongSdkDirtyOptions getOptions(ReadableConfig config) { .sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) .sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) .csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER)) - .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) + .inlongManagerAuthKey( + decrypt(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY), + config.get(DIRTY_SIDE_OUTPUT_LABELS))) .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) .ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS)) .retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES)) @@ -109,6 +120,20 @@ private InlongSdkDirtyOptions getOptions(ReadableConfig config) { .build(); } + private String decrypt(String encrypted, String key) { + String decrypted = null; + + try { + byte[] bytes = AESUtils.parseHexStr2Byte(encrypted); + bytes = AESUtils.decrypt(bytes, key.trim().getBytes(StandardCharsets.UTF_8)); + decrypted = new String(Base64.decodeBase64(bytes), StandardCharsets.UTF_8); + } catch (Throwable t) { + log.warn("failed to decrypt {} with key {}", encrypted, key, t); + throw new RuntimeException(t); + } + return decrypted; + } + @Override public String factoryIdentifier() { return IDENTIFIER; diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java new file mode 100644 index 00000000000..e86422ba8f9 --- /dev/null +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/AESUtils.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty.utils; + +import lombok.extern.slf4j.Slf4j; + +import javax.crypto.Cipher; +import javax.crypto.KeyGenerator; +import javax.crypto.SecretKey; + +import java.security.SecureRandom; + +/** + * AES encryption and decryption utils. + */ +@Slf4j +public class AESUtils { + + private static final int KEY_SIZE = 128; + private static final String ALGORITHM = "AES"; + private static final String RNG_ALGORITHM = "SHA1PRNG"; + + /** + * Generate key + */ + private static SecretKey generateKey(byte[] aesKey) throws Exception { + SecureRandom random = SecureRandom.getInstance(RNG_ALGORITHM); + random.setSeed(aesKey); + KeyGenerator gen = KeyGenerator.getInstance(ALGORITHM); + gen.init(KEY_SIZE, random); + return gen.generateKey(); + } + + /** + * Encrypt by key + */ + public static byte[] encrypt(byte[] plainBytes, byte[] key) throws Exception { + SecretKey secKey = generateKey(key); + Cipher cipher = Cipher.getInstance(ALGORITHM); + cipher.init(Cipher.ENCRYPT_MODE, secKey); + return cipher.doFinal(plainBytes); + } + + /** + * Decrypt by key and specified version + */ + public static byte[] decrypt(byte[] cipherBytes, byte[] key) throws Exception { + SecretKey secKey = generateKey(key); + Cipher cipher = Cipher.getInstance(ALGORITHM); + cipher.init(Cipher.DECRYPT_MODE, secKey); + return cipher.doFinal(cipherBytes); + } + + /** + * Parse byte to String in Hex type + */ + public static String parseByte2HexStr(byte[] buf) { + StringBuilder strBuf = new StringBuilder(); + for (byte b : buf) { + String hex = Integer.toHexString(b & 0xFF); + if (hex.length() == 1) { + hex = '0' + hex; + } + strBuf.append(hex.toUpperCase()); + } + return strBuf.toString(); + } + + /** + * Parse String to byte as Hex type + */ + public static byte[] parseHexStr2Byte(String hexStr) { + if (hexStr.length() < 1) { + return null; + } + byte[] result = new byte[hexStr.length() / 2]; + for (int i = 0; i < hexStr.length() / 2; i++) { + int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16); + int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16); + result[i] = (byte) (high * 16 + low); + } + return result; + } +}