Skip to content

Commit

Permalink
OZ-323: Updates to support the refactored Flink Jobs (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
enyachoke authored Oct 24, 2023
1 parent 9667af6 commit 6f22f74
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 189 deletions.
69 changes: 51 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,42 @@ The services have been split into multiple files this allows you to start only t

`cd ozone-analytics/docker `

### Export environment variables
```bash
export DISTRO_PATH=<path_to_ozonepro_distro>
```

``` bash
export ANALYTICS_QUERIES_PATH=$DISTRO_PATH/analytics_config/dsl/flattening/queries/;
export ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH=$DISTRO_PATH/analytics_config/liquibase/analytics/

export ANALYTICS_DB_HOST=gateway.docker.internal; \
export ANALYTICS_DB_PORT=5432; \
export CONNECT_MYSQL_HOSTNAME=gateway.docker.internal; \
export CONNECT_MYSQL_PORT=3307; \
export CONNECT_MYSQL_USER=root; \
export CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey; \
export CONNECT_ODOO_DB_HOSTNAME=gateway.docker.internal; \
export CONNECT_ODOO_DB_PORT=5432; \
export CONNECT_ODOO_DB_NAME=odoo; \
export CONNECT_ODOO_DB_USER=postgres; \
export CONNECT_ODOO_DB_PASSWORD=password\

export EXPORT_DESTINATION_TABLES_PATH=$DISTRO_PATH/analytics_config/dsl/export/tables/;\
export EXPORT_SOURCE_QUERIES_PATH=$DISTRO_PATH/analytics_config/dsl/export/queries;\
export EXPORT_OUTPUT_PATH=$(pwd)/data/parquet/;\
export EXPORT_OUTPUT_TAG=h1;
```

**Note**: The gateway.docker.internal is a special DNS name that resolves to the host machine from within containers. It is only available for Mac and Windows. For Linux, use the docker host IP by default 172.17.0.1

### Streaming and flatening pipelines only (without Superset)

In cases where you don't need to start Superset (for example when you will use the Parquet export job to create Parquet files to later upload onto Minio or S3, or if you want to plug your own BI tool) you can start only the streaming and flatening data pipelines by running:

`docker compose -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml up -d --build`
```bash
docker compose -f docker-compose-migration.yaml -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml up -d --build
```

Which will start ;

Expand All @@ -55,7 +86,9 @@ Which will start ;

To start the complete streaming and flatening suite, including Superset as the BI tool, run:

`docker compose -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml -f docker-compose-superset.yaml up -d --build`
```bash
docker compose -f docker-compose-db.yaml -f docker-compose-data-pipelines.yaml -f docker-compose-superset.yaml up -d --build
```

This will start the following services:

Expand All @@ -66,6 +99,15 @@ This will start the following services:

> NOTE: The streaming jobs may fail for a while during the initial start up as Flink discovers data partitions from Kafka. You can wait for this to sort itself out or you can try to restart the `jobmanager` and `taskmanager` services with `docker compose -f docker-compose-data-pipelines.yaml restart jobmanager taskmanager`
### Streaming and flatening pipelines only against an existing deployment
When you have an existing deployment of Ozone and you want to run the streaming and flatening pipelines against it, you can use the following command:

```bash
docker compose -f docker-compose-data-pipelines-external.yaml -f docker-compose-migration.yaml up -d
```



### Drill-backed analytics server

In cases where you have multiple instances of Ozone deployed in remote locations, you may what to process data onsite with the streaming and flatening pipelines but ship the data to a central repository for analytics. This provides a solution that uses:
Expand Down Expand Up @@ -94,36 +136,27 @@ To start this stack run;
- cd `docker/` and run the following commands

- Start database services
```
```bash
docker compose -f docker-compose-db.yaml up -d
```
- Run the batch ETL job to transform the data
```
docker compose -f docker-compose-batch-etl.yaml up
```bash
docker compose -f docker-compose-migration.yaml -f docker-compose-batch-etl.yaml up
```
- Export data in a Parquet format
```
export LOCATION_TAG=<location_id>
```bash
docker compose -f docker-compose-export.yaml up
```
:bulb: data folder should be found at `./docker/data/parquet`

### Parquet export against an existing production deployment

- Set the variables
```
export ANALYTICS_DB_HOST=<postgres_host_ip>
export OPENMRS_DB_HOST=<mysql_host_ip>
export LOCATION_TAG=<location_id>
```
**Note**: if the host of the database is your localhost use `host.docker.internal`

- Run the batch ETL job to transform the data
```
docker compose -f docker-compose-batch-etl.yaml up
```bash
docker compose -f docker-compose-migration.yaml -f docker-compose-batch-etl.yaml up
```
- Export data in a Parquet format
```
```bash
docker compose -f docker-compose-export.yaml up
```
:bulb: data folder should be found at `./docker/data/parquet`
24 changes: 21 additions & 3 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,21 @@ OPENMRS_DB_HOST=mysql
OPENMRS_DB_PORT=3306
OPENMRS_DB_NAME=openmrs

CONNECT_MYSQL_USERNAME=root
CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey
# Kafka Connect
CONNECT_MYSQL_HOSTNAME=mysql
CONNECT_MYSQL_PORT=3306
CONNECT_MYSQL_USER=root
CONNECT_MYSQL_PASSWORD=3cY8Kve4lGey

CONNECT_ODOO_DB_HOSTNAME=postgresql
CONNECT_ODOO_DB_PORT=5432
CONNECT_ODOO_DB_NAME=odoo
CONNECT_ODOO_DB_USER=postgres
CONNECT_ODOO_DB_PASSWORD=password

# Odoo
ODOO_DB_USER=odoo
ODOO_DB_PASSWORD=password

# Superset
# The secret key is used by Superset for encryption. You should generate one with something like `openssl rand -base64 48` and replace the sample secret below.
Expand All @@ -36,9 +48,15 @@ ANALYTICS_DB_PASSWORD=password
ANALYTICS_DB_HOST=postgresql
ANALYTICS_DB_PORT=5432
ANALYTICS_DB_NAME=analytics
ANALYTICS_SOURCE_TABLES_PATH=
ANALYTICS_QUERIES_PATH=
ANALYTICS_DESTINATION_TABLES_MIGRATIONS_PATH=
CHANGELOG_FILE=db.changelog-master.xml
ODOO_ANALYTICS_TABLES='databasechangelog,account_account,product_category,sale_order'
ANALYTICS_KAFKA_URL=kafka:9092

# Kafka
TOPICS=appointment_service,appointment_service_type,care_setting,concept,concept_name,concept_reference_map,concept_reference_source,concept_reference_term,conditions,encounter,encounter_diagnosis,encounter_type,location,form,obs,order_type,orders,patient,patient_appointment,patient_appointment_provider,patient_identifier,patient_identifier_type,patient_program,program,person,person_name,person_address,visit_type,visit
CREATE_TOPICS=openmrs.openmrs.appointment_service,openmrs.openmrs.appointment_service_type,openmrs.openmrs.care_setting,openmrs.openmrs.concept,openmrs.openmrs.concept_name,openmrs.openmrs.concept_reference_map,openmrs.openmrs.concept_reference_source,openmrs.openmrs.concept_reference_term,openmrs.openmrs.conditions,openmrs.openmrs.encounter,openmrs.openmrs.encounter_diagnosis,openmrs.openmrs.encounter_type,openmrs.openmrs.location,openmrs.openmrs.form,openmrs.openmrs.obs,openmrs.openmrs.order_type,openmrs.openmrs.orders,openmrs.openmrs.patient,openmrs.openmrs.patient_appointment,openmrs.openmrs.patient_appointment_provider,openmrs.openmrs.patient_identifier,openmrs.openmrs.patient_identifier_type,openmrs.openmrs.patient_program,openmrs.openmrs.program,openmrs.openmrs.person,openmrs.openmrs.person_name,openmrs.openmrs.person_address,openmrs.openmrs.visit_type,openmrs.openmrs.visit,odoo.public.sale_order

# Postgres
POSTGRES_USER=postgres
Expand Down
29 changes: 13 additions & 16 deletions docker/docker-compose-batch-etl.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
version: '3.8'
services:
migrate:
networks:
ozone-analytics:
restart: on-failure
image: mekomsolutions/ozone-flink-jobs:1.0.0-alpha1
command: /run.sh
environment:
- FLINK_JOB_POSTGRES_USER=${ANALYTICS_DB_USER}
- FLINK_JOB_POSTGRES_PASSWORD=${ANALYTICS_DB_PASSWORD}
- FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME}
- FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
extra_hosts:
- "host.docker.internal:host-gateway"
batch-etl:
networks:
ozone-analytics:
image: mekomsolutions/ozone-flink-jobs-batch:1.0.0-alpha2
image: mekomsolutions/ozone-flink-jobs-batch
depends_on:
- migrate
analytics-migration:
condition: service_completed_successfully
environment:
SOURCE_JDBC_URL: jdbc:mysql://${OPENMRS_DB_HOST}:${OPENMRS_DB_PORT}/${OPENMRS_DB_NAME}
SOURCE_JDBC_USERNAME: root
Expand All @@ -29,10 +17,19 @@ services:
PROPERTIES_FILE: '/opt/flink/usrlib/job.properties'
FLINK_CONF_DIR: '/etc/flink'
JAVA_OPTS: '-XX:MaxRAMPercentage=80.0'
ANALYTICS_QUERIES_PATH: /analytics/queries
ANALYTICS_SOURCE_TABLES_PATH: /analytics/source-tables
ANALYTICS_DB_NAME: ${ANALYTICS_DB_NAME}
ANALYTICS_DB_USER: ${ANALYTICS_DB_USER}
ANALYTICS_DB_PASSWORD: ${ANALYTICS_DB_PASSWORD}
ANALYTICS_DB_HOST: ${ANALYTICS_DB_HOST}
ANALYTICS_DB_PORT: ${ANALYTICS_DB_PORT}
extra_hosts:
- "host.docker.internal:host-gateway"
volumes:
- ./flink/pipelines/job.properties:/opt/flink/usrlib/job.properties
- ./flink/config/flink-conf.yaml:/etc/flink/flink-conf.yaml
- ${ANALYTICS_QUERIES_PATH}:/analytics/queries
- ${ANALYTICS_SOURCE_TABLES_PATH}:/analytics/source-tables
networks:
ozone-analytics:
ozone-analytics:
124 changes: 68 additions & 56 deletions docker/docker-compose-data-pipelines-external.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@ services:
labels:
kompose.service.type: clusterip
kafka:
networks:
ozone-analytics:
restart: on-failure
image: debezium/kafka:${DEBEZIUM_VERSION}
networks:
ozone-analytics:
ports:
- 9092:9092

- 9092:9092
- 29092:29092
environment:
- CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g
- BROKER_ID=1
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- CLUSTER_ID=5Yr1SIgYQz-b-dgRabWx4g
- BROKER_ID=1
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093,PLAINTEXT_HOST://0.0.0.0:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://kafka:29092
volumes:
- kafka-data:/kafka/data
healthcheck:
test: ["CMD-SHELL","/bin/bash", "-c", "./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"]
test:
[
"CMD-SHELL",
"/bin/bash",
"-c",
"./bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"
]
kafka-setup:
networks:
ozone-analytics:
Expand All @@ -43,72 +52,63 @@ services:
for topic in $${topics[@]}
do
echo "Creating topic $$topic..."
./bin/kafka-topics.sh --create --topic openmrs.openmrs.$$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092
./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name openmrs.openmrs.$$topic --alter --add-config retention.ms=31556736000
./bin/kafka-topics.sh --create --topic $$topic --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9092
./bin/kafka-configs.sh --bootstrap-server=kafka:9092 --entity-type topics --entity-name $$topic --alter --add-config retention.ms=31556736000
done
environment:
- TOPICS=${TOPICS}
- TOPICS=${CREATE_TOPICS}
depends_on:
- kafka
connect:
networks:
ozone-analytics:
restart: on-failure
image: debezium/connect:${DEBEZIUM_VERSION}
networks:
ozone-analytics:
ports:
- 8083:8083
depends_on:
- kafka-setup
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_CONFIG_PROVIDERS=file
- CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider
- CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql}
- CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root}
- CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${MYSQL_ROOT_PASSWORD}}
- CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}}
- CONNECT_MYSQL_SERVER_ID=${CONNECT_MYSQL_SERVER_ID:-184054}
- CONNECT_MYSQL_SERVER_NAME=openmrs
- CONNECT_MYSQL_INCLUDE_LIST=openmrs
- CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log
- CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs
- CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- 8383:8083
volumes:
- ./debezium-connect/jars/TimestampConverter-1.2.4-SNAPSHOT.jar:/kafka/connect/debezium-connector-mysql/TimestampConverter-1.2.4-SNAPSHOT.jar
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_CONFIG_PROVIDERS=file
- CONNECT_CONFIG_PROVIDERS_FILE_CLASS=org.apache.kafka.common.config.provider.FileConfigProvider
- CONNECT_MYSQL_HOSTNAME=${CONNECT_MYSQL_HOSTNAME:-mysql}
- CONNECT_MYSQL_USERNAME=${CONNECT_MYSQL_USERNAME:-root}
- CONNECT_MYSQL_PASSWORD=${CONNECT_MYSQL_PASSWORD:-${CONNECT_MYSQL_PASSWORD}}
- CONNECT_MYSQL_PORT=${CONNECT_MYSQL_PORT:-${CONNECT_MYSQL_PORT}}
- CONNECT_MYSQL_SERVER_ID=37991
- CONNECT_MYSQL_SERVER_NAME=openmrs
- CONNECT_MYSQL_INCLUDE_LIST=openmrs
- CONNECT_TABLE_EXCLUDE_LIST=openmrs.audit_log
- CONNECT_MYSQL_HISTROY_TOPIC=dbhistory.openmrs
- CONNECT_MYSQL_KAFKA_BOOTSTRAP_SERVERS=kafka:9092
- CONNECT_ODOO_DB_HOSTNAME=${CONNECT_ODOO_DB_HOSTNAME:-postgresql}
- CONNECT_ODOO_DB_USERNAME=${CONNECT_ODOO_DB_USERNAME:-${POSTGRES_USER}}
- CONNECT_ODOO_DB_PASSWORD=${CONNECT_ODOO_DB_PASSWORD:-${POSTGRES_PASSWORD}}
- CONNECT_ODOO_DB_PORT=${CONNECT_ODOO_DB_PORT:-5432}
- CONNECT_ODOO_DB_SERVER_NAME=odoo
- CONNECT_ODOO_DB_INCLUDE_LIST=odoo

connect-setup:
restart: on-failure
image: mekomsolutions/ozone-analytics-setup-connect
networks:
ozone-analytics:
restart: on-failure
build: ./setup-connect
depends_on:
- jobmanager
- taskmanager
- connect
environment:
- CONNECT_HOST=connect
- ANALYTICS_DB_HOST=${ANALYTICS_DB_HOST}
- SOURCE_DB_HOST=${CONNECT_MYSQL_HOSTNAME}
- SOURCE_DB_PORT=${CONNECT_MYSQL_PORT}
- ODOO_DB_HOST=${ODOO_DB_HOST:-${CONNECT_ODOO_DB_HOSTNAME}}
- ODOO_DB_PORT=${ODOO_DB_PORT:-${CONNECT_ODOO_DB_PORT}}
- FLINK_JOBMANAGER_HOST=jobmanager
migrate:
networks:
ozone-analytics:
restart: on-failure
image: mekomsolutions/ozone-flink-jobs

command: /run.sh
environment:
- FLINK_JOB_POSTGRES_USER=${ANALYTICS_DB_USER}
- FLINK_JOB_POSTGRES_PASSWORD=${ANALYTICS_DB_PASSWORD}
- FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME}
- FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
labels:
- kompose.service.group=flink-manager
jobmanager:
networks:
ozone-analytics:
Expand All @@ -118,8 +118,10 @@ services:
- "8084:8081"
command: standalone-job --job-classname com.ozonehis.data.pipelines.streaming.StreamingETLJob --job-id 00000000000000000000000000000000 --properties-file /opt/flink/usrlib/job.properties --sink-url jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME} --sink-username ${ANALYTICS_DB_USER} --sink-password ${ANALYTICS_DB_PASSWORD}
depends_on:
migrate:
odoo-replica-identity-migration:
condition: service_completed_successfully
analytics-migration:
condition: service_completed_successfully
zookeeper:
condition: service_started
kafka:
Expand Down Expand Up @@ -151,10 +153,20 @@ services:
- FLINK_JOB_POSTGRES_PASSWORD=password
- FLINK_JOB_POSTGRES_URL=jdbc:postgresql://${ANALYTICS_DB_HOST}:${ANALYTICS_DB_PORT}/${ANALYTICS_DB_NAME}
- FLINK_JOB_PROPERTIES_BOOTSTRAP_SERVERS=kafka:9092
- ANALYTICS_SOURCE_TABLES_PATH=/analytics/source-tables
- ANALYTICS_QUERIES_PATH=/analytics/queries
- ANALYTICS_DB_NAME=${ANALYTICS_DB_NAME}
- ANALYTICS_DB_USER=${ANALYTICS_DB_USER}
- ANALYTICS_DB_PASSWORD=${ANALYTICS_DB_PASSWORD}
- ANALYTICS_DB_HOST=${ANALYTICS_DB_HOST}
- ANALYTICS_DB_PORT=${ANALYTICS_DB_PORT}
- ANALYTICS_KAFKA_URL=${ANALYTICS_KAFKA_URL}
volumes:
- flink-shared-data:/tmp
- ./flink/pipelines/job.properties:/opt/flink/usrlib/job.properties
- ./data:/data
- ${ANALYTICS_SOURCE_TABLES_PATH}:/analytics/source-tables
- ${ANALYTICS_QUERIES_PATH}:/analytics/queries
taskmanager:
networks:
ozone-analytics:
Expand Down
Loading

0 comments on commit 6f22f74

Please sign in to comment.