From 6f22f748d10a0c9d94bda2556b839171eca3ea66 Mon Sep 17 00:00:00 2001 From: Emmanuel Nyachoke Date: Tue, 24 Oct 2023 09:11:16 +0300 Subject: [PATCH] OZ-323: Updates to support the refactored Flink Jobs (#10) --- README.md | 69 +++++-- docker/.env | 24 ++- docker/docker-compose-batch-etl.yaml | 29 ++- ...ocker-compose-data-pipelines-external.yaml | 124 ++++++------ docker/docker-compose-data-pipelines.yaml | 182 +++++++++--------- docker/docker-compose-db.yaml | 4 + docker/docker-compose-export.yaml | 16 +- docker/docker-compose-kowl.yaml | 27 +++ docker/docker-compose-migration.yaml | 25 +++ docker/liquibase/odoo/db.changelog-master.xml | 19 ++ .../odoo/sql/function_replicaIdentity.sql | 17 ++ .../odoo/sql/update_replicaIdentity.sql | 1 + docker/sqls/postgresql/create_odoo_db.sh | 16 ++ ...rset-setup.sh => create_superset-setup.sh} | 0 14 files changed, 364 insertions(+), 189 deletions(-) create mode 100644 docker/docker-compose-kowl.yaml create mode 100644 docker/docker-compose-migration.yaml create mode 100644 docker/liquibase/odoo/db.changelog-master.xml create mode 100644 docker/liquibase/odoo/sql/function_replicaIdentity.sql create mode 100644 docker/liquibase/odoo/sql/update_replicaIdentity.sql create mode 100755 docker/sqls/postgresql/create_odoo_db.sh rename docker/sqls/postgresql/{superset-setup.sh => create_superset-setup.sh} (100%) diff --git a/README.md b/README.md index f2a2d6e..203f707 100644 --- a/README.md +++ b/README.md @@ -34,11 +34,42 @@ The services have been split into multiple files this allows you to start only t `cd ozone-analytics/docker ` +### Export environment variables +```bash +export DISTRO_PATH= +``` + +``` bash +export ANALYTICS_QUERIES_PATH=$DISTRO_PATH/analytics_config/dsl/flattening/queries/; +export ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH=$DISTRO_PATH/analytics_config/liquibase/analytics/ + +export ANALYTICS_DB_HOST=gateway.docker.internal; \ +export ANALYTICS_DB_PORT=5432; \ +export CONNECT_MYSQL_HOSTNAME=gateway.docker.internal; \ +export CONNECT_MYSQL_PORT=3307; \ +export CONNECT_MYSQL_USER=root; \ +export CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey; \ +export CONNECT_ODOO_DB_HOSTNAME=gateway.docker.internal; \ +export CONNECT_ODOO_DB_PORT=5432; \ +export CONNECT_ODOO_DB_NAME=odoo; \ +export CONNECT_ODOO_DB_USER=postgres; \ +export CONNECT_ODOO_DB_PASSWORD=password\ + +export EXPORT_DESTINATION_TABLES_PATH=$DISTRO_PATH/analytics_config/dsl/export/tables/;\ +export EXPORT_SOURCE_QUERIES_PATH=$DISTRO_PATH/analytics_config/dsl/export/queries;\ +export EXPORT_OUTPUT_PATH=$(pwd)/data/parquet/;\ +export EXPORT_OUTPUT_TAG=h1; +``` + +**Note**: The gateway.docker.internal is a special DNS name that resolves to the host machine from within containers. It is only available for Mac and Windows. For Linux, use the docker host IP by default 172.17.0.1 + ### Streaming and flatening pipelines only (without Superset) In cases where you don't need to start Superset (for example when you will use the Parquet export job to create Parquet files to later upload onto Minio or S3, or if you want to plug your own BI tool) you can start only the streaming and flatening data pipelines by running: -`docker compose -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml up -d --build` +```bash +docker compose -f docker-compose-migration.yaml -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml up -d --build +``` Which will start ; @@ -55,7 +86,9 @@ Which will start ; To start the complete streaming and flatening suite, including Superset as the BI tool, run: -`docker compose -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml -f docker-compose-superset.yaml up -d --build` +```bash +docker compose -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml -f docker-compose-superset.yaml up -d --build +``` This will start the following services: @@ -66,6 +99,15 @@ This will start the following services: > NOTE: The streaming jobs may fail for a while during the initial start up as Flink discovers data partitions from Kafka. You can wait for this to sort itself out or you can try to restart the `jobmanager` and `taskmanager` services with `docker compose -f docker-compose-data-pipelines.yaml restart jobmanager taskmanager` +### Streaming and flatening pipelines only against an existing deployment +When you have an existing deployment of Ozone and you want to run the streaming and flatening pipelines against it, you can use the following command: + +```bash +docker compose -f docker-compose-data-pipelines-external.yaml -f docker-compose-migration.yaml up -d +``` + + + ### Drill-backed analytics server In cases where you have multiple instances of Ozone deployed in remote locations, you may what to process data onsite with the streaming and flatening pipelines but ship the data to a central repository for analytics. This provides a solution that uses: @@ -94,36 +136,27 @@ To start this stack run; - cd `docker/` and run the following commands - Start database services -``` +```bash docker compose -f docker-compose-db.yaml up -d ``` - Run the batch ETL job to transform the data -``` -docker compose -f docker-compose-batch-etl.yaml up +```bash +docker compose -f docker-compose-migration.yaml -f docker-compose-batch-etl.yaml up ``` - Export data in a Parquet format -``` -export LOCATION_TAG= +```bash docker compose -f docker-compose-export.yaml up ``` :bulb: data folder should be found at `./docker/data/parquet` ### Parquet export against an existing production deployment -- Set the variables -``` -export ANALYTICS_DB_HOST= -export OPENMRS_DB_HOST= -export LOCATION_TAG= -``` -**Note**: if the host of the database is your localhost use `host.docker.internal` - - Run the batch ETL job to transform the data -``` -docker compose -f docker-compose-batch-etl.yaml up +```bash +docker compose -f docker-compose-migration.yaml -f docker-compose-batch-etl.yaml up ``` - Export data in a Parquet format -``` +```bash docker compose -f docker-compose-export.yaml up ``` :bulb: data folder should be found at `./docker/data/parquet` diff --git a/docker/.env b/docker/.env index cfea505..9dade46 100644 --- a/docker/.env +++ b/docker/.env @@ -13,9 +13,21 @@ OPENMRS_DB_HOST=mysql OPENMRS_DB_PORT=3306 OPENMRS_DB_NAME=openmrs -CONNECT_MYSQL_USERNAME=root -CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey +# Kafka Connect +CONNECT_MYSQL_HOSTNAME=mysql CONNECT_MYSQL_PORT=3306 +CONNECT_MYSQL_USER=root +CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey + +CONNECT_ODOO_DB_HOSTNAME=postgresql +CONNECT_ODOO_DB_PORT=5432 +CONNECT_ODOO_DB_NAME=odoo +CONNECT_ODOO_DB_USER=postgres +CONNECT_ODOO_DB_PASSWORD=password + +# Odoo +ODOO_DB_USER=odoo +ODOO_DB_PASSWORD=password # Superset # The secret key is used by Superset for encryption. You should generate one with something like `openssl rand -base64 48` and replace the sample secret below. @@ -36,9 +48,15 @@ ANALYTICS_DB_PASSWORD=password ANALYTICS_DB_HOST=postgresql ANALYTICS_DB_PORT=5432 ANALYTICS_DB_NAME=analytics +ANALYTICS_SOURCE_TABLES_PATH= +ANALYTICS_QUERIES_PATH= +ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH= +CHANGELOG_FILE=db.changelog-master.xml +ODOO_ANALYTICS_TABLES='databasechangelog,account_account,product_category,sale_order' +ANALYTICS_KAFKA_URL=kafka:9092 # Kafka -TOPICS=appointment_service,appointment_service_type,care_setting,concept,concept_name,concept_reference_map,concept_reference_source,concept_reference_term,conditions,encounter,encounter_diagnosis,encounter_type,location,form,obs,order_type,orders,patient,patient_appointment,patient_appointment_provider,patient_identifier,patient_identifier_type,patient_program,program,person,person_name,person_address,visit_type,visit +CREATE_TOPICS=openmrs.openmrs.appointment_service,openmrs.openmrs.appointment_service_type,openmrs.openmrs.care_setting,openmrs.openmrs.concept,openmrs.openmrs.concept_name,openmrs.openmrs.concept_reference_map,openmrs.openmrs.concept_reference_source,openmrs.openmrs.concept_reference_term,openmrs.openmrs.conditions,openmrs.openmrs.encounter,openmrs.openmrs.encounter_diagnosis,openmrs.openmrs.encounter_type,openmrs.openmrs.location,openmrs.openmrs.form,openmrs.openmrs.obs,openmrs.openmrs.order_type,openmrs.openmrs.orders,openmrs.openmrs.patient,openmrs.openmrs.patient_appointment,openmrs.openmrs.patient_appointment_provider,openmrs.openmrs.patient_identifier,openmrs.openmrs.patient_identifier_type,openmrs.openmrs.patient_program,openmrs.openmrs.program,openmrs.openmrs.person,openmrs.openmrs.person_name,openmrs.openmrs.person_address,openmrs.openmrs.visit_type,openmrs.openmrs.visit,odoo.public.sale_order # Postgres POSTGRES_USER=postgres diff --git a/docker/docker-compose-batch-etl.yaml b/docker/docker-compose-batch-etl.yaml index 523c0c3..50c9bb9 100644 --- a/docker/docker-compose-batch-etl.yaml +++ b/docker/docker-compose-batch-etl.yaml @@ -1,24 +1,12 @@ version: '3.8' services: - migrate: - networks: - ozone-analytics: - restart: on-failure - image: mekomsolutions/ozone-flink-jobs:1.0.0-alpha1 - command: /run.sh - environment: - - FLINK_JOB_POSTGRES_USER=${ANALYTICS_DB_USER} - - FLINK_JOB_POSTGRES_PASSWORD=${ANALYTICS_DB_PASSWORD} - - FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} - - FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 - extra_hosts: - - "host.docker.internal:host-gateway" batch-etl: networks: ozone-analytics: - image: mekomsolutions/ozone-flink-jobs-batch:1.0.0-alpha2 + image: mekomsolutions/ozone-flink-jobs-batch depends_on: - - migrate + analytics-migration: + condition: service_completed_successfully environment: SOURCE_JDBC_URL: jdbc:mysql://${OPENMRS_DB_HOST}:${OPENMRS_DB_PORT}/${OPENMRS_DB_NAME} SOURCE_JDBC_USERNAME: root @@ -29,10 +17,19 @@ services: PROPERTIES_FILE: '/opt/flink/usrlib/job.properties' FLINK_CONF_DIR: '/etc/flink' JAVA_OPTS: '-XX:MaxRAMPercentage=80.0' + ANALYTICS_QUERIES_PATH: /analytics/queries + ANALYTICS_SOURCE_TABLES_PATH: /analytics/source-tables + ANALYTICS_DB_NAME: ${ANALYTICS_DB_NAME} + ANALYTICS_DB_USER: ${ANALYTICS_DB_USER} + ANALYTICS_DB_PASSWORD: ${ANALYTICS_DB_PASSWORD} + ANALYTICS_DB_HOST: ${ANALYTICS_DB_HOST} + ANALYTICS_DB_PORT: ${ANALYTICS_DB_PORT} extra_hosts: - "host.docker.internal:host-gateway" volumes: - ./flink/pipelines/job.properties:/opt/flink/usrlib/job.properties - ./flink/config/flink-conf.yaml:/etc/flink/flink-conf.yaml + - ${ANALYTICS_QUERIES_PATH}:/analytics/queries + - ${ANALYTICS_SOURCE_TABLES_PATH}:/analytics/source-tables networks: - ozone-analytics: \ No newline at end of file + ozone-analytics: diff --git a/docker/docker-compose-data-pipelines-external.yaml b/docker/docker-compose-data-pipelines-external.yaml index 57dba9f..ffef92f 100644 --- a/docker/docker-compose-data-pipelines-external.yaml +++ b/docker/docker-compose-data-pipelines-external.yaml @@ -15,21 +15,30 @@ services: labels: kompose.service.type: clusterip kafka: - networks: - ozone-analytics: restart: on-failure image: debezium/kafka:${DEBEZIUM_VERSION} + networks: + ozone-analytics: ports: - - 9092:9092 - + - 9092:9092 + - 29092:29092 environment: - - CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g - - BROKER_ID=1 - - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g + - BROKER_ID=1 + - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093,PLAINTEXT_HOST://0.0.0.0:29092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://kafka:29092 volumes: - kafka-data:/kafka/data healthcheck: - test: ["CMD-SHELL","/bin/bash", "-c", "./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"] + test: + [ + "CMD-SHELL", + "/bin/bash", + "-c", + "./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list" + ] kafka-setup: networks: ozone-analytics: @@ -43,72 +52,63 @@ services: for topic in $${topics[@]} do echo "Creating topic $$topic..." - ./bin/kafka-topics.sh --create --topic openmrs.openmrs.$$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092 - ./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name openmrs.openmrs.$$topic --alter --add-config retention.ms=31556736000 + ./bin/kafka-topics.sh --create --topic $$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092 + ./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name $$topic --alter --add-config retention.ms=31556736000 done environment: - - TOPICS=${TOPICS} + - TOPICS=${CREATE_TOPICS} depends_on: - kafka connect: - networks: - ozone-analytics: restart: on-failure image: debezium/connect:${DEBEZIUM_VERSION} + networks: + ozone-analytics: ports: - - 8083:8083 - depends_on: - - kafka-setup - environment: - - BOOTSTRAP_SERVERS=kafka:9092 - - GROUP_ID=1 - - CONFIG_STORAGE_TOPIC=my_connect_configs - - OFFSET_STORAGE_TOPIC=my_connect_offsets - - STATUS_STORAGE_TOPIC=my_connect_statuses - - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false - - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false - - CONNECT_CONFIG_PROVIDERS=file - - CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider - - CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql} - - CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root} - - CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${MYSQL_ROOT_PASSWORD}} - - CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}} - - CONNECT_MYSQL_SERVER_ID=${CONNECT_MYSQL_SERVER_ID:-184054} - - CONNECT_MYSQL_SERVER_NAME=openmrs - - CONNECT_MYSQL_INCLUDE_LIST=openmrs - - CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log - - CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs - - CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - 8383:8083 volumes: - ./debezium-connect/jars/TimestampConverter-1.2.4-SNAPSHOT.jar:/kafka/connect/debezium-connector-mysql/TimestampConverter-1.2.4-SNAPSHOT.jar + environment: + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_connect_configs + - OFFSET_STORAGE_TOPIC=my_connect_offsets + - STATUS_STORAGE_TOPIC=my_connect_statuses + - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false + - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false + - CONNECT_CONFIG_PROVIDERS=file + - CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider + - CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql} + - CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root} + - CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${CONNECT_MYSQL_PASSWORD}} + - CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}} + - CONNECT_MYSQL_SERVER_ID=37991 + - CONNECT_MYSQL_SERVER_NAME=openmrs + - CONNECT_MYSQL_INCLUDE_LIST=openmrs + - CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log + - CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs + - CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - CONNECT_ODOO_DB_HOSTNAME=${CONNECT_ODOO_DB_HOSTNAME:-postgresql} + - CONNECT_ODOO_DB_USERNAME=${CONNECT_ODOO_DB_USERNAME:-${POSTGRES_USER}} + - CONNECT_ODOO_DB_PASSWORD=${CONNECT_ODOO_DB_PASSWORD:-${POSTGRES_PASSWORD}} + - CONNECT_ODOO_DB_PORT=${CONNECT_ODOO_DB_PORT:-5432} + - CONNECT_ODOO_DB_SERVER_NAME=odoo + - CONNECT_ODOO_DB_INCLUDE_LIST=odoo + connect-setup: + restart: on-failure + image: mekomsolutions/ozone-analytics-setup-connect networks: ozone-analytics: - restart: on-failure - build: ./setup-connect depends_on: - - jobmanager - - taskmanager - connect environment: - CONNECT_HOST=connect - - ANALYTICS_DB_HOST=${ANALYTICS_DB_HOST} - SOURCE_DB_HOST=${CONNECT_MYSQL_HOSTNAME} + - SOURCE_DB_PORT=${CONNECT_MYSQL_PORT} + - ODOO_DB_HOST=${ODOO_DB_HOST:-${CONNECT_ODOO_DB_HOSTNAME}} + - ODOO_DB_PORT=${ODOO_DB_PORT:-${CONNECT_ODOO_DB_PORT}} - FLINK_JOBMANAGER_HOST=jobmanager - migrate: - networks: - ozone-analytics: - restart: on-failure - image: mekomsolutions/ozone-flink-jobs - - command: /run.sh - environment: - - FLINK_JOB_POSTGRES_USER=${ANALYTICS_DB_USER} - - FLINK_JOB_POSTGRES_PASSWORD=${ANALYTICS_DB_PASSWORD} - - FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} - - FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 - labels: - - kompose.service.group=flink-manager jobmanager: networks: ozone-analytics: @@ -118,8 +118,10 @@ services: - "8084:8081" command: standalone-job --job-classname com.ozonehis.data.pipelines.streaming.StreamingETLJob --job-id 00000000000000000000000000000000 --properties-file /opt/flink/usrlib/job.properties --sink-url jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} --sink-username ${ANALYTICS_DB_USER} --sink-password ${ANALYTICS_DB_PASSWORD} depends_on: - migrate: + odoo-replica-identity-migration: condition: service_completed_successfully + analytics-migration: + condition: service_completed_successfully zookeeper: condition: service_started kafka: @@ -151,10 +153,20 @@ services: - FLINK_JOB_POSTGRES_PASSWORD=password - FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} - FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 + - ANALYTICS_SOURCE_TABLES_PATH=/analytics/source-tables + - ANALYTICS_QUERIES_PATH=/analytics/queries + - ANALYTICS_DB_NAME=${ANALYTICS_DB_NAME} + - ANALYTICS_DB_USER=${ANALYTICS_DB_USER} + - ANALYTICS_DB_PASSWORD=${ANALYTICS_DB_PASSWORD} + - ANALYTICS_DB_HOST=${ANALYTICS_DB_HOST} + - ANALYTICS_DB_PORT=${ANALYTICS_DB_PORT} + - ANALYTICS_KAFKA_URL=${ANALYTICS_KAFKA_URL} volumes: - flink-shared-data:/tmp - ./flink/pipelines/job.properties:/opt/flink/usrlib/job.properties - ./data:/data + - ${ANALYTICS_SOURCE_TABLES_PATH}:/analytics/source-tables + - ${ANALYTICS_QUERIES_PATH}:/analytics/queries taskmanager: networks: ozone-analytics: diff --git a/docker/docker-compose-data-pipelines.yaml b/docker/docker-compose-data-pipelines.yaml index e35bf73..b2c721c 100644 --- a/docker/docker-compose-data-pipelines.yaml +++ b/docker/docker-compose-data-pipelines.yaml @@ -6,13 +6,13 @@ services: restart: on-failure image: debezium/zookeeper:${DEBEZIUM_VERSION} ports: - - 2181:2181 - - 2888:2888 - - 3888:3888 + - 2181:2181 + - 2888:2888 + - 3888:3888 volumes: - zookeeper-data:/zookeeper/data - zookeeper-txns:/zookeeper/txns - labels: + labels: kompose.service.type: clusterip kafka: networks: @@ -20,18 +20,27 @@ services: restart: on-failure image: debezium/kafka:${DEBEZIUM_VERSION} ports: - - 9092:9092 + - 9092:9092 environment: - - CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g - - BROKER_ID=1 - - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g + - BROKER_ID=1 + - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 + - KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093,PLAINTEXT_HOST://0.0.0.0:29092 + - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://kafka:29092 volumes: - kafka-data:/kafka/data depends_on: - mysql healthcheck: - test: ["CMD-SHELL","/bin/bash", "-c", "./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"] + test: + [ + "CMD-SHELL", + "/bin/bash", + "-c", + "./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list" + ] kafka-setup: networks: ozone-analytics: @@ -45,50 +54,56 @@ services: for topic in $${topics[@]} do echo "Creating topic $$topic..." - ./bin/kafka-topics.sh --create --topic openmrs.openmrs.$$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092 - ./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name openmrs.openmrs.$$topic --alter --add-config retention.ms=31556736000 + ./bin/kafka-topics.sh --create --topic $$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092 + ./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name $$topic --alter --add-config retention.ms=31556736000 done environment: - - TOPICS=${TOPICS} + - TOPICS=${CREATE_TOPICS} depends_on: - kafka + connect: networks: ozone-analytics: restart: on-failure image: debezium/connect:${DEBEZIUM_VERSION} ports: - - 8083:8083 + - 8083:8083 depends_on: - - kafka-setup - mysql environment: - - BOOTSTRAP_SERVERS=kafka:9092 - - GROUP_ID=1 - - CONFIG_STORAGE_TOPIC=my_connect_configs - - OFFSET_STORAGE_TOPIC=my_connect_offsets - - STATUS_STORAGE_TOPIC=my_connect_statuses - - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false - - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false - - CONNECT_CONFIG_PROVIDERS=file - - CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider - - CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql} - - CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root} - - CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${MYSQL_ROOT_PASSWORD}} - - CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}} - - CONNECT_MYSQL_SERVER_ID=${CONNECT_MYSQL_SERVER_ID:-184054} - - CONNECT_MYSQL_SERVER_NAME=openmrs - - CONNECT_MYSQL_INCLUDE_LIST=openmrs - - CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log - - CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs - - CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - BOOTSTRAP_SERVERS=kafka:9092 + - GROUP_ID=1 + - CONFIG_STORAGE_TOPIC=my_connect_configs + - OFFSET_STORAGE_TOPIC=my_connect_offsets + - STATUS_STORAGE_TOPIC=my_connect_statuses + - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false + - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false + - CONNECT_CONFIG_PROVIDERS=file + - CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider + - CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql} + - CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root} + - CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${MYSQL_ROOT_PASSWORD}} + - CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}} + - CONNECT_MYSQL_SERVER_ID=${CONNECT_MYSQL_SERVER_ID:-184054} + - CONNECT_MYSQL_SERVER_NAME=openmrs + - CONNECT_MYSQL_INCLUDE_LIST=openmrs + - CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log + - CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs + - CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 + - CONNECT_ODOO_DB_HOSTNAME=${CONNECT_ODOO_DB_HOSTNAME:-postgresql} + - CONNECT_ODOO_DB_USERNAME=${CONNECT_ODOO_DB_USERNAME:-${POSTGRES_USER}} + - CONNECT_ODOO_DB_PASSWORD=${CONNECT_ODOO_DB_PASSWORD:-${POSTGRES_PASSWORD}} + - CONNECT_ODOO_DB_PORT=${CONNECT_ODOO_DB_PORT:-5432} + - CONNECT_ODOO_DB_SERVER_NAME=odoo + - CONNECT_ODOO_DB_INCLUDE_LIST=odoo volumes: - ./debezium-connect/jars/TimestampConverter-1.2.4-SNAPSHOT.jar:/kafka/connect/debezium-connector-mysql/TimestampConverter-1.2.4-SNAPSHOT.jar connect-setup: networks: ozone-analytics: restart: on-failure - build: ./setup-connect + image: mekomsolutions/ozone-analytics-setup-connect depends_on: - postgresql - mysql @@ -97,51 +112,12 @@ services: - connect environment: - CONNECT_HOST=connect - - ANALYTICS_DB_HOST=${ANALYTICS_DB_HOST} - SOURCE_DB_HOST=${CONNECT_MYSQL_HOSTNAME} + - SOURCE_DB_PORT=${CONNECT_MYSQL_PORT} + - ODOO_DB_HOST=${ODOO_DB_HOST:-${CONNECT_ODOO_DB_HOSTNAME}} + - ODOO_DB_PORT=${ODOO_DB_PORT:-${CONNECT_ODOO_DB_PORT}} - FLINK_JOBMANAGER_HOST=jobmanager - kowl: - networks: - ozone-analytics: - image: quay.io/cloudhut/kowl:master - container_name: "kowl" - restart: on-failure - entrypoint: /bin/sh - command: -c "echo \"$$KOWL_CONFIG_FILE\" > /tmp/config.yml; /app/kowl" - ports: - - "8282:8080" - environment: - CONFIG_FILEPATH: /tmp/config.yml - KOWL_CONFIG_FILE: | - kafka: - brokers: ["kafka:9092"] - connect: - enabled: true - clusters: - - name: openmrs - url: http://connect:8083 - depends_on: - - "kafka" - - "connect" - labels: - kompose.service.type: clusterip - - migrate: - networks: - ozone-analytics: - restart: on-failure - image: mekomsolutions/ozone-flink-jobs - command: /run.sh - depends_on: - - postgresql - environment: - - FLINK_JOB_POSTGRES_USER=${ANALYTICS_DB_USER} - - FLINK_JOB_POSTGRES_PASSWORD=${ANALYTICS_DB_PASSWORD} - - FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} - - FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 - labels: - - kompose.service.group=flink-manager jobmanager: networks: ozone-analytics: @@ -151,16 +127,18 @@ services: - "8084:8081" command: standalone-job --job-classname com.ozonehis.data.pipelines.streaming.StreamingETLJob --job-id 00000000000000000000000000000000 --properties-file /opt/flink/usrlib/job.properties --sink-url jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} --sink-username ${ANALYTICS_DB_USER} --sink-password ${ANALYTICS_DB_PASSWORD} depends_on: - migrate: + odoo-replica-identity-migration: condition: service_completed_successfully - zookeeper: - condition: service_started - kafka: - condition: service_started - kafka-setup: + analytics-migration: + condition: service_completed_successfully + zookeeper: + condition: service_started + kafka: + condition: service_started + postgresql: + condition: service_started + kafka-setup: condition: service_completed_successfully - postgresql: - condition: service_started environment: - | FLINK_PROPERTIES= @@ -184,10 +162,20 @@ services: - FLINK_JOB_POSTGRES_PASSWORD=password - FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} - FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092 + - ANALYTICS_SOURCE_TABLES_PATH=/analytics/source-tables + - ANALYTICS_QUERIES_PATH=/analytics/queries + - ANALYTICS_DB_NAME=${ANALYTICS_DB_NAME} + - ANALYTICS_DB_USER=${ANALYTICS_DB_USER} + - ANALYTICS_DB_PASSWORD=${ANALYTICS_DB_PASSWORD} + - ANALYTICS_DB_HOST=${ANALYTICS_DB_HOST} + - ANALYTICS_DB_PORT=${ANALYTICS_DB_PORT} + - ANALYTICS_KAFKA_URL=${ANALYTICS_KAFKA_URL} volumes: - flink-shared-data:/tmp - ./flink/pipelines/job.properties:/opt/flink/usrlib/job.properties - ./data:/data + - ${ANALYTICS_SOURCE_TABLES_PATH}:/analytics/source-tables + - ${ANALYTICS_QUERIES_PATH}:/analytics/queries taskmanager: networks: ozone-analytics: @@ -219,15 +207,21 @@ services: - ./flink/pipelines/job.properties:/opt/flink/usrlib/job.properties - ./data:/data restart: on-failure + odoo-replica-identity-migration: + depends_on: + - postgresql + analytics-migration: + depends_on: + - postgresql volumes: - mysql-data: - flink-shared-data: ~ - postgresql-data: ~ - redis: ~ - zookeeper-data: ~ - kafka-data: ~ - zookeeper-txns: ~ - consul-data: ~ - openmrs-referenceapplication-data: ~ + mysql-data: + flink-shared-data: ~ + postgresql-data: ~ + redis: ~ + zookeeper-data: ~ + kafka-data: ~ + zookeeper-txns: ~ + consul-data: ~ + openmrs-referenceapplication-data: ~ networks: - ozone-analytics: \ No newline at end of file + ozone-analytics: diff --git a/docker/docker-compose-db.yaml b/docker/docker-compose-db.yaml index 8d47915..61429a9 100644 --- a/docker/docker-compose-db.yaml +++ b/docker/docker-compose-db.yaml @@ -41,11 +41,15 @@ services: SUPERSET_DB: ${SUPERSET_DB} SUPERSET_DB_USER: ${SUPERSET_DB_USER} SUPERSET_DB_PASSWORD: ${SUPERSET_DB_PASSWORD} + ODOO_DB_USER: ${ODOO_DB_USER} + ODOO_DB_PASSWORD: ${ODOO_DB_PASSWORD} + ODOO_DB_NAME: ${CONNECT_ODOO_DB_NAME} volumes: - ${POSTGRES_DATADIR:-postgresql-data}:/var/lib/postgresql/data - "${SQL_SCRIPTS_PATH:-./sqls/postgresql}:/docker-entrypoint-initdb.d" ports: - "5432:5432" + volumes: mysql-data: diff --git a/docker/docker-compose-export.yaml b/docker/docker-compose-export.yaml index 52a7ece..54e9168 100644 --- a/docker/docker-compose-export.yaml +++ b/docker/docker-compose-export.yaml @@ -3,17 +3,29 @@ services: parquet-export: networks: ozone-analytics: - image: mekomsolutions/ozone-flink-parquet-export:1.0.0-alpha1 + image: mekomsolutions/ozone-flink-parquet-export environment: JDBC_URL: jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} JDBC_USERNAME: ${ANALYTICS_DB_USER} JDBC_PASSWORD: ${ANALYTICS_DB_PASSWORD} - LOCATION_TAG: ${LOCATION_TAG} + LOCATION_TAG: ${LOCATION_TAG:-location1} FLINK_CONF_DIR: '/etc/flink' JAVA_OPTS: '-XX:MaxRAMPercentage=80.0' + EXPORT_SOURCE_QUERIES_PATH: /export/queries + EXPORT_DESTINATION_TABLES_PATH: /export/destination-tables + ANALYTICS_DB_NAME: ${ANALYTICS_DB_NAME} + ANALYTICS_DB_USER: ${ANALYTICS_DB_USER} + ANALYTICS_DB_PASSWORD: ${ANALYTICS_DB_PASSWORD} + ANALYTICS_DB_HOST: ${ANALYTICS_DB_HOST} + ANALYTICS_DB_PORT: ${ANALYTICS_DB_PORT} + EXPORT_OUTPUT_TAG: ${EXPORT_OUTPUT_TAG} + EXPORT_OUTPUT_PATH: /parquet volumes: - ./data/parquet:/parquet - ./flink/config/flink-conf.yaml:/etc/flink/flink-conf.yaml + - ${EXPORT_DESTINATION_TABLES_PATH}:/export/destination-tables + - ${EXPORT_SOURCE_QUERIES_PATH}:/export/queries + - ${EXPORT_OUTPUT_PATH}:/parquet extra_hosts: - "host.docker.internal:host-gateway" networks: diff --git a/docker/docker-compose-kowl.yaml b/docker/docker-compose-kowl.yaml new file mode 100644 index 0000000..18f1612 --- /dev/null +++ b/docker/docker-compose-kowl.yaml @@ -0,0 +1,27 @@ +version: '3.8' +services: + kowl: + image: rsmnarts/kowl + networks: + - ozone-analytics + container_name: "kowl" + restart: on-failure + entrypoint: /bin/sh + command: -c "echo \"$$KOWL_CONFIG_FILE\" > /tmp/config.yml; /app/kowl" + ports: + - "8282:8080" + environment: + CONFIG_FILEPATH: /tmp/config.yml + KOWL_CONFIG_FILE: | + kafka: + brokers: ["kafka:9092"] + connect: + enabled: true + clusters: + - name: Ozone + url: http://connect:8083 + depends_on: + - "kafka" + - "connect" +networks: + ozone-analytics: \ No newline at end of file diff --git a/docker/docker-compose-migration.yaml b/docker/docker-compose-migration.yaml new file mode 100644 index 0000000..13b9e61 --- /dev/null +++ b/docker/docker-compose-migration.yaml @@ -0,0 +1,25 @@ +version: '3.8' +services: + odoo-replica-identity-migration: + image: liquibase/liquibase + networks: + ozone-analytics: + restart: on-failure + volumes: + - ./liquibase/odoo/:/liquibase/changelog/ + command: update -Dtables='databasechangelog,account_account' --username=${CONNECT_ODOO_DB_USER} --password=${CONNECT_ODOO_DB_PASSWORD} --changeLogFile=${CHANGELOG_FILE} --url=jdbc:postgresql://${CONNECT_ODOO_DB_HOSTNAME}:${CONNECT_ODOO_DB_PORT}/${CONNECT_ODOO_DB_NAME} + environment: + - INSTALL_POSTGRESQL='true' + - TABLES=${ODOO_ANALYTICS_TABLES} + analytics-migration: + image: liquibase/liquibase + networks: + ozone-analytics: + restart: on-failure + volumes: + - ${ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH:-analytics-migrations}:/liquibase/changelog/ + command: update --username=${ANALYTICS_DB_USER} --password=${ANALYTICS_DB_PASSWORD} --changeLogFile=${CHANGELOG_FILE} --url=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} + environment: + - INSTALL_POSTGRESQL='true' +networks: + ozone-analytics: \ No newline at end of file diff --git a/docker/liquibase/odoo/db.changelog-master.xml b/docker/liquibase/odoo/db.changelog-master.xml new file mode 100644 index 0000000..400642f --- /dev/null +++ b/docker/liquibase/odoo/db.changelog-master.xml @@ -0,0 +1,19 @@ + + + + + + + + + + SELECT replicaIdentity(string_to_array('${tables}', ',')) s; + + + \ No newline at end of file diff --git a/docker/liquibase/odoo/sql/function_replicaIdentity.sql b/docker/liquibase/odoo/sql/function_replicaIdentity.sql new file mode 100644 index 0000000..7e13025 --- /dev/null +++ b/docker/liquibase/odoo/sql/function_replicaIdentity.sql @@ -0,0 +1,17 @@ +DROP FUNCTION IF EXISTS replicaIdentity; + +GO + +CREATE FUNCTION replicaIdentity(tables text []) RETURNS void AS $$ + DECLARE m text; + BEGIN + FOREACH m IN ARRAY tables + LOOP + EXECUTE format($fmt$ + ALTER TABLE %I REPLICA IDENTITY FULL; + $fmt$, m); + END LOOP; + END; +$$ LANGUAGE plpgsql; + +GO \ No newline at end of file diff --git a/docker/liquibase/odoo/sql/update_replicaIdentity.sql b/docker/liquibase/odoo/sql/update_replicaIdentity.sql new file mode 100644 index 0000000..b7dd190 --- /dev/null +++ b/docker/liquibase/odoo/sql/update_replicaIdentity.sql @@ -0,0 +1 @@ +SELECT replicaIdentity(string_to_array('${odooanalyticstables}', ',')) s; \ No newline at end of file diff --git a/docker/sqls/postgresql/create_odoo_db.sh b/docker/sqls/postgresql/create_odoo_db.sh new file mode 100755 index 0000000..153dc43 --- /dev/null +++ b/docker/sqls/postgresql/create_odoo_db.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -eu + +function create_user_and_database() { + local database=$1 + local user=$2 + local password=$3 + echo " Creating '$user' user and '$database' database..." + psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" $POSTGRES_DB <<-EOSQL + CREATE USER $user WITH PASSWORD '$password'; + CREATE DATABASE $database; + GRANT ALL PRIVILEGES ON DATABASE $database TO $user; +EOSQL +} +create_user_and_database ${ODOO_DB_NAME} ${ODOO_DB_USER} ${ODOO_DB_PASSWORD} diff --git a/docker/sqls/postgresql/superset-setup.sh b/docker/sqls/postgresql/create_superset-setup.sh similarity index 100% rename from docker/sqls/postgresql/superset-setup.sh rename to docker/sqls/postgresql/create_superset-setup.sh