diff --git a/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java b/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java index 823aeb74..8eed9865 100644 --- a/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java +++ b/debezium-server-http/src/main/java/io/debezium/server/http/HttpChangeConsumer.java @@ -49,6 +49,7 @@ public class HttpChangeConsumer extends BaseChangeConsumer implements DebeziumEn private static final String PROP_CLIENT_TIMEOUT = "timeout.ms"; private static final String PROP_RETRIES = "retries"; private static final String PROP_RETRY_INTERVAL = "retry.interval.ms"; + private static final String PROP_AUTORIZATION_HEADER = "authorization"; private static final Long HTTP_TIMEOUT = Integer.toUnsignedLong(60000); // Default to 60s private static final int DEFAULT_RETRIES = 5; @@ -67,6 +68,7 @@ public class HttpChangeConsumer extends BaseChangeConsumer implements DebeziumEn void connect() throws URISyntaxException { String sinkUrl; String contentType; + String authorizationHeader; client = HttpClient.newHttpClient(); final Config config = ConfigProvider.getConfig(); @@ -90,6 +92,10 @@ void connect() throws URISyntaxException { config.getOptionalValue(PROP_PREFIX + PROP_RETRY_INTERVAL, String.class) .ifPresent(t -> retryInterval = Duration.ofMillis(Long.parseLong(t))); + + if (config.getOptionalValue(PROP_PREFIX + PROP_AUTORIZATION_HEADER, String.class) != null) { + authorizationHeader = config.getOptionalValue(PROP_PREFIX + PROP_AUTORIZATION_HEADER, String.class); + } switch (config.getValue("debezium.format.value", String.class)) { case "avro": @@ -107,6 +113,10 @@ void connect() throws URISyntaxException { LOGGER.info("Using sink URL: {}", sinkUrl); requestBuilder = HttpRequest.newBuilder(new URI(sinkUrl)).timeout(timeoutDuration); requestBuilder.setHeader("content-type", contentType); + if (authorizationHeader != null) { + LOGGER.info("Using authorization header {}", authorizationHeader); + requestBuilder.setHeader("authorization",authorizationHeader); + } } @Override