Skip to content

Commit

Permalink
Feat dcas process (#333)
Browse files Browse the repository at this point in the history
* fix local db shm_size

* add dcas pipeline

* fix merge dataframe

* pipeline to process dcas

* calculate cumulative sum of gdd

* refactor codes

* fix missing parameter
  • Loading branch information
danangmassandy authored Jan 14, 2025
1 parent 34605a4 commit 59c4f0d
Show file tree
Hide file tree
Showing 13 changed files with 1,347 additions and 1 deletion.
1 change: 1 addition & 0 deletions deployment/docker-compose.override.devcontainer.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
version: '3.9'
services:
db:
shm_size: 1g
volumes:
- ./volumes/database:/opt/postgres/data
- ./volumes/backups:/backups
Expand Down
11 changes: 11 additions & 0 deletions deployment/docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,14 @@ django-admin-inline-paginator==0.4.0

# durable rules engine
durable-rules==2.0.28

# sqlalchemy
sqlalchemy==2.0.36
GeoAlchemy2==0.16.0

# dask geopandas
dask-geopandas==0.4.2

# duckdb
duckdb==1.1.3
pyarrow==18.1.0
83 changes: 83 additions & 0 deletions django_project/dcas/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: DCAS Functions to process row data.
"""

import random
import pandas as pd

from dcas.rules.rule_engine import DCASRuleEngine
from dcas.rules.variables import DCASData


def calculate_growth_stage(
row: pd.Series, growth_stage_list: list, current_date
) -> pd.Series:
"""Identify the growth stage and its start date.
The calculation will be using GDD cumulative sum for each day.
:param row: single row
:type row: pd.Series
:param growth_stage_list: list of growth stage
:type growth_stage_list: list
:param current_date: request date
:type current_date: date
:return: row with growth_stage_id and growth_stage_start_date
:rtype: pd.Series
"""
# TODO: lookup the growth_stage based on total_gdd value
row['growth_stage_id'] = random.choice(growth_stage_list)

# possible scenario:
# - no prev_growth_stage_start_date or prev_growth_stage_id
# - growth_stage_id is the same with prev_growth_stage_id
# - growth_stage_id is different with prev_growth_stage_id
#
# for 2nd scenario, use the prev_growth_stage_start_date
# for 1st and 3rd scenario, we need to find the date that
# growth stage is changed

if (
row['prev_growth_stage_start_date'] is None or
pd.isnull(row['prev_growth_stage_id']) or
pd.isna(row['prev_growth_stage_id']) or
row['growth_stage_id'] != row['prev_growth_stage_id']
):
row['growth_stage_start_date'] = current_date
return row


def calculate_message_output(
row: pd.Series, rule_engine: DCASRuleEngine, attrib_dict: dict
) -> pd.Series:
"""Execute rule engine and get the message output.
:param row: single row
:type row: pd.Series
:param rule_engine: Rule Engine
:type rule_engine: DCASRuleEngine
:param attrib_dict: attribute and its id
:type attrib_dict: dict
:return: row with message output columns
:rtype: pd.Series
"""
params = [
{
'id': attribute_id,
'value': row[key]
} for key, attribute_id in attrib_dict.items()
]
data = DCASData(
row['crop_id'],
row['crop_stage_type_id'],
row['growth_stage_id'],
params
)
rule_engine.execute_rule(data)

for idx, code in enumerate(data.message_codes):
var_name = f'message_{idx + 1}' if idx > 0 else 'message'
row[var_name] = code
return row
Empty file.
Empty file.
50 changes: 50 additions & 0 deletions django_project/dcas/management/commands/check_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: Run DCAS Data Pipeline
"""

import logging
import os
from django.core.management.base import BaseCommand

import duckdb


logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Command to process DCAS Pipeline."""

def export_to_csv(self, sql):
"""Export as csv."""
conn = duckdb.connect()
final_query = (
f"""
COPY({sql})
TO 'output.csv'
(HEADER, DELIMITER ',');
"""
)
conn.sql(final_query)
conn.close()

def handle(self, *args, **options):
"""Check Data Output."""
grid_path = os.path.join(
'/tmp', 'dcas', 'grid_crop'
)

sql = (
f"""
SELECT *
FROM read_parquet('{grid_path}/*.parquet')
"""
)
self.export_to_csv(sql)

# conn = duckdb.connect()
# conn.sql(sql).show()
# conn.close()
33 changes: 33 additions & 0 deletions django_project/dcas/management/commands/run_dcas_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: Run DCAS Data Pipeline
"""

import logging
import datetime
from django.core.management.base import BaseCommand

from gap.models import (
FarmRegistryGroup
)
from dcas.models import DCASConfig
from dcas.pipeline import DCASDataPipeline


logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Command to process DCAS Pipeline."""

def handle(self, *args, **options):
"""Run DCAS Pipeline."""
dt = datetime.date(2024, 12, 1)
config = DCASConfig.objects.get(id=1)
farm_registry_group = FarmRegistryGroup.objects.get(id=1)

pipeline = DCASDataPipeline(farm_registry_group, config, dt)

pipeline.run()
181 changes: 181 additions & 0 deletions django_project/dcas/outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# coding=utf-8
"""
Tomorrow Now GAP.
.. note:: DCAS Outputs
"""

import os
import shutil
import fsspec
import pandas as pd
from dask.dataframe.core import DataFrame as dask_df
import dask_geopandas as dg
from dask_geopandas.io.parquet import to_parquet
from typing import Union

from gap.utils.dask import execute_dask_compute


class OutputType:
"""Enum class for output type."""

GRID_DATA = 1
GRID_CROP_DATA = 2
FARM_CROP_DATA = 3


class DCASPipelineOutput:
"""Class to manage pipeline output."""

TMP_BASE_DIR = '/tmp/dcas'

def __init__(self, request_date):
"""Initialize DCASPipelineOutput."""
self.fs = None
self.request_date = request_date

def setup(self):
"""Set DCASPipelineOutput."""
self._setup_s3fs()

# clear temp resource
if os.path.exists(self.TMP_BASE_DIR):
shutil.rmtree(self.TMP_BASE_DIR)
os.makedirs(self.TMP_BASE_DIR)

def cleanup(self):
"""Remove temporary directory."""
if os.path.exists(self.TMP_BASE_DIR):
shutil.rmtree(self.TMP_BASE_DIR)

@property
def grid_data_file_path(self):
"""Return full path to grid data output parquet file."""
return os.path.join(
self.TMP_BASE_DIR,
'grid_data.parquet'
)

@property
def grid_crop_data_dir_path(self):
"""Return full path to grid with crop data."""
return os.path.join(
self.TMP_BASE_DIR,
'grid_crop'
)

def _setup_s3fs(self):
"""Initialize s3fs."""
self.s3 = self._get_s3_variables()
self.s3_options = {
'key': self.s3.get('AWS_ACCESS_KEY_ID'),
'secret': self.s3.get('AWS_SECRET_ACCESS_KEY'),
'client_kwargs': self._get_s3_client_kwargs()
}
self.fs = fsspec.filesystem(
's3',
key=self.s3.get('AWS_ACCESS_KEY_ID'),
secret=self.s3.get('AWS_SECRET_ACCESS_KEY'),
client_kwargs=self._get_s3_client_kwargs()
)

def _get_s3_variables(self) -> dict:
"""Get s3 env variables for product bucket.
:return: Dictionary of S3 env vars
:rtype: dict
"""
prefix = 'MINIO'
keys = [
'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY',
'AWS_ENDPOINT_URL', 'AWS_REGION_NAME'
]
results = {}
for key in keys:
results[key] = os.environ.get(f'{prefix}_{key}', '')
results['AWS_BUCKET_NAME'] = os.environ.get(
'MINIO_GAP_AWS_BUCKET_NAME', '')
results['AWS_DIR_PREFIX'] = os.environ.get(
'MINIO_GAP_AWS_DIR_PREFIX', '')

return results

def _get_s3_client_kwargs(self) -> dict:
"""Get s3 client kwargs for parquet file.
:return: dictionary with key endpoint_url or region_name
:rtype: dict
"""
prefix = 'MINIO'
client_kwargs = {}
if os.environ.get(f'{prefix}_AWS_ENDPOINT_URL', ''):
client_kwargs['endpoint_url'] = os.environ.get(
f'{prefix}_AWS_ENDPOINT_URL', '')
if os.environ.get(f'{prefix}_AWS_REGION_NAME', ''):
client_kwargs['region_name'] = os.environ.get(
f'{prefix}_AWS_REGION_NAME', '')
return client_kwargs

def _get_directory_path(self, directory_name):
return (
f"s3://{self.s3['AWS_BUCKET_NAME']}/"
f"{self.s3['AWS_DIR_PREFIX']}/{directory_name}"
)

def save(self, type: int, df: Union[pd.DataFrame, dask_df]):
"""Save output to parquet files.
:param type: Type of the dataframe output
:type type: int
:param df: DataFrame output
:type df: Union[pd.DataFrame, dask_df]
:raises ValueError: Raise when there is invalid type
"""
if type == OutputType.GRID_DATA:
self._save_grid_data(df)
elif type == OutputType.GRID_CROP_DATA:
self._save_grid_crop_data(df)
elif type == OutputType.FARM_CROP_DATA:
self._save_farm_crop_data(df)
else:
raise ValueError(f'Invalid output type {type} to be saved!')

def _save_farm_crop_data(self, df: dask_df, directory_name='dcas_output'):
df_geo = dg.from_dask_dataframe(
df,
geometry=dg.from_wkb(df['geometry'])
)

print('Saving to parquet')

x = to_parquet(
df_geo,
self._get_directory_path(directory_name),
partition_on=['iso_a3', 'year', 'month', 'day'],
filesystem=self.fs,
compute=False
)
print(f'writing to {self._get_directory_path(directory_name)}')
execute_dask_compute(x)

def _save_grid_crop_data(self, df: dask_df):
dir_path = self.grid_crop_data_dir_path
if os.path.exists(dir_path):
shutil.rmtree(dir_path)
os.makedirs(dir_path)

print('Saving to parquet')

df = df.reset_index(drop=True)
x = df.to_parquet(
dir_path,
compute=False
)
print(f'writing to {dir_path}')
execute_dask_compute(x)

def _save_grid_data(self, df: pd.DataFrame):
file_path = self.grid_data_file_path
print(f'writing dataframe to {file_path}')
df.to_parquet(file_path)
Loading

0 comments on commit 59c4f0d

Please sign in to comment.