Skip to content

Commit

Permalink
[INLONG-11621][Sort] Support auth key encryption when use inlong sdk …
Browse files Browse the repository at this point in the history
…dirty sink (#11653)

* [INLONG-11621][Sort] Support auth key encryption when using inlong sdk dirty sink
  • Loading branch information
vernedeng authored Jan 6, 2025
1 parent 91d0b03 commit db5619d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void open(Configuration configuration) throws Exception {
converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());

log.info("inlong sdk dirty options={}", options);
// init sender
dirtySender = InlongSdkDirtySender.builder()
.inlongManagerAddr(options.getInlongManagerAddr())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,25 @@

import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
import org.apache.inlong.sort.base.dirty.utils.AESUtils;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;

import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;

Expand Down Expand Up @@ -86,11 +90,16 @@ public class InlongSdkDirtySinkFactory implements DirtySinkFactory {

@Override
public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context) {
ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions());
FactoryUtil.validateFactoryOptions(this, config);
InlongSdkDirtyOptions options = getOptions(config);
return new InlongSdkDirtySink<>(options,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
try {
ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions());
FactoryUtil.validateFactoryOptions(this, config);
InlongSdkDirtyOptions options = getOptions(config);
return new InlongSdkDirtySink<>(options,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
} catch (Throwable t) {
log.warn("failed to create dirty sink", t);
return null;
}
}

private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
Expand All @@ -100,7 +109,9 @@ private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
.sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
.sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
.inlongManagerAuthKey(
decrypt(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY),
config.get(DIRTY_SIDE_OUTPUT_LABELS)))
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
.ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
.retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
Expand All @@ -109,6 +120,20 @@ private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
.build();
}

private String decrypt(String encrypted, String key) {
String decrypted = null;

try {
byte[] bytes = AESUtils.parseHexStr2Byte(encrypted);
bytes = AESUtils.decrypt(bytes, key.trim().getBytes(StandardCharsets.UTF_8));
decrypted = new String(Base64.decodeBase64(bytes), StandardCharsets.UTF_8);
} catch (Throwable t) {
log.warn("failed to decrypt {} with key {}", encrypted, key, t);
throw new RuntimeException(t);
}
return decrypted;
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.base.dirty.utils;

import lombok.extern.slf4j.Slf4j;

import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;

import java.security.SecureRandom;

/**
* AES encryption and decryption utils.
*/
@Slf4j
public class AESUtils {

private static final int KEY_SIZE = 128;
private static final String ALGORITHM = "AES";
private static final String RNG_ALGORITHM = "SHA1PRNG";

/**
* Generate key
*/
private static SecretKey generateKey(byte[] aesKey) throws Exception {
SecureRandom random = SecureRandom.getInstance(RNG_ALGORITHM);
random.setSeed(aesKey);
KeyGenerator gen = KeyGenerator.getInstance(ALGORITHM);
gen.init(KEY_SIZE, random);
return gen.generateKey();
}

/**
* Encrypt by key
*/
public static byte[] encrypt(byte[] plainBytes, byte[] key) throws Exception {
SecretKey secKey = generateKey(key);
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.ENCRYPT_MODE, secKey);
return cipher.doFinal(plainBytes);
}

/**
* Decrypt by key and specified version
*/
public static byte[] decrypt(byte[] cipherBytes, byte[] key) throws Exception {
SecretKey secKey = generateKey(key);
Cipher cipher = Cipher.getInstance(ALGORITHM);
cipher.init(Cipher.DECRYPT_MODE, secKey);
return cipher.doFinal(cipherBytes);
}

/**
* Parse byte to String in Hex type
*/
public static String parseByte2HexStr(byte[] buf) {
StringBuilder strBuf = new StringBuilder();
for (byte b : buf) {
String hex = Integer.toHexString(b & 0xFF);
if (hex.length() == 1) {
hex = '0' + hex;
}
strBuf.append(hex.toUpperCase());
}
return strBuf.toString();
}

/**
* Parse String to byte as Hex type
*/
public static byte[] parseHexStr2Byte(String hexStr) {
if (hexStr.length() < 1) {
return null;
}
byte[] result = new byte[hexStr.length() / 2];
for (int i = 0; i < hexStr.length() / 2; i++) {
int high = Integer.parseInt(hexStr.substring(i * 2, i * 2 + 1), 16);
int low = Integer.parseInt(hexStr.substring(i * 2 + 1, i * 2 + 2), 16);
result[i] = (byte) (high * 16 + low);
}
return result;
}
}

0 comments on commit db5619d

Please sign in to comment.