From c4e2f6daf9e0dad2c43b79dc4ea9d0e5a2dbe4a3 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Wed, 8 Jan 2025 16:04:56 +0530 Subject: [PATCH 1/5] 1 - BulkJobsave - table sql Signed-off-by: msvinaykumar --- design/KruizeDatabaseDesign.md | 42 +++++++++++++++++++ migrations/kruize_local_ddl.sql | 7 ++++ .../analyzer/services/BulkService.java | 4 +- .../database/init/KruizeHibernateUtil.java | 4 ++ .../database/table/lm/JobMetaData.java | 29 +++++++++++++ .../com/autotune/database/table/lm/Jobs.java | 35 ++++++++++++++++ .../operator/KruizeDeploymentInfo.java | 2 +- 7 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/autotune/database/table/lm/JobMetaData.java create mode 100644 src/main/java/com/autotune/database/table/lm/Jobs.java diff --git a/design/KruizeDatabaseDesign.md b/design/KruizeDatabaseDesign.md index 37a9c6fed..9254c072b 100644 --- a/design/KruizeDatabaseDesign.md +++ b/design/KruizeDatabaseDesign.md @@ -19,6 +19,8 @@ The Kruize Autotune project has the following entities: 2. kruize_results 3. kruize_recommendations 4. kruize_performance_profiles +5. kruize_jobs +6. kruize_jobmetadata ## **kruize_experiments** @@ -819,3 +821,43 @@ curl --location --request POST 'http://127.0.0.1:8080/createPerformanceProfile' ``` insert into kruize_performance_profiles; ``` + +## **kruize_jobs** + +--- + +This table stores job-level data, including information such as job status, start and end times, notification details, total and processed counts. +```sql +CREATE TABLE kruize_jobs ( + job_id UUID NOT NULL, + end_time TIMESTAMP(6), + start_time TIMESTAMP(6), + notifications JSONB, + processed_count INTEGER, + status VARCHAR(255), + total_count INTEGER, + webhook VARCHAR(255), + PRIMARY KEY (job_id) +); +``` + +## **kruize_jobmetadata** + +--- +This table stores metadata for individual experiments associated with a job. It uses hash-based partitioning on job_id for scalability and performance. +```sql +CREATE TABLE kruize_jobmetadata ( + id BIGSERIAL NOT NULL, + experiment_name VARCHAR(255), + notification JSONB, + recommendation_notifications JSONB, + recommendation_status VARCHAR(255), + job_id UUID NOT NULL, + PRIMARY KEY (job_id, experiment_name) +) PARTITION BY HASH (job_id); + +ALTER TABLE IF EXISTS kruize_jobmetadata + ADD CONSTRAINT bulkJobMetaDataConstraint + FOREIGN KEY (job_id) REFERENCES kruize_jobs; + +``` \ No newline at end of file diff --git a/migrations/kruize_local_ddl.sql b/migrations/kruize_local_ddl.sql index 645dfbab1..b3d7ea17a 100644 --- a/migrations/kruize_local_ddl.sql +++ b/migrations/kruize_local_ddl.sql @@ -6,3 +6,10 @@ alter table kruize_lm_experiments add column metadata_id bigint references krui alter table if exists kruize_lm_experiments add constraint UK_lm_experiment_name unique (experiment_name); create table IF NOT EXISTS kruize_metric_profiles (api_version varchar(255), kind varchar(255), metadata jsonb, name varchar(255) not null, k8s_type varchar(255), profile_version float(53) not null, slo jsonb, primary key (name)); create table IF NOT EXISTS kruize_lm_recommendations (interval_end_time timestamp(6) not null, experiment_name varchar(255) not null, cluster_name varchar(255), extended_data jsonb, version varchar(255),experiment_type varchar(255), primary key (experiment_name, interval_end_time)) PARTITION BY RANGE (interval_end_time); +create table IF NOT EXISTS kruize_jobs (job_id uuid not null, end_time timestamp(6), start_time timestamp(6), notifications jsonb, processed_count integer, status varchar(255), total_count integer, webhook varchar(255), primary key (job_id)); +create table IF NOT EXISTS kruize_jobmetadata (id bigserial not null, experiment_name varchar(255), notification jsonb, recommendation_notifications jsonb, recommendation_status varchar(255), job_id uuid not null, primary key (job_id, experiment_name)) PARTITION BY HASH (job_id); +alter table if exists kruize_jobmetadata add constraint bulkJobMetaDataConstraint foreign key (job_id) references kruize_jobs; + + + + diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index d22e7ac7d..43d5ce9e6 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -42,6 +42,7 @@ import java.util.concurrent.Executors; import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.*; +import static com.autotune.operator.KruizeDeploymentInfo.cacheJobInMemory; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; /** @@ -153,7 +154,8 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) // Generate a unique jobID String jobID = UUID.randomUUID().toString(); BulkJobStatus jobStatus = new BulkJobStatus(jobID, IN_PROGRESS, Instant.now()); - jobStatusMap.put(jobID, jobStatus); + if(cacheJobInMemory) + jobStatusMap.put(jobID, jobStatus); // Submit the job to be processed asynchronously executorService.submit(new BulkJobManager(jobID, jobStatus, payload)); diff --git a/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java b/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java index 841ac99a9..e3c2f6b09 100644 --- a/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java +++ b/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java @@ -17,6 +17,8 @@ import com.autotune.database.table.*; +import com.autotune.database.table.lm.Jobs; +import com.autotune.database.table.lm.JobMetaData; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; import com.autotune.operator.KruizeDeploymentInfo; @@ -65,6 +67,8 @@ public static void buildSessionFactory() { configuration.addAnnotatedClass(KruizeDSMetadataEntry.class); configuration.addAnnotatedClass(KruizeMetricProfileEntry.class); configuration.addAnnotatedClass(KruizeAuthenticationEntry.class); + configuration.addAnnotatedClass(Jobs.class); + configuration.addAnnotatedClass(JobMetaData.class); } LOGGER.info("DB is trying to connect to {}", connectionURL); sfTemp = configuration.buildSessionFactory(); diff --git a/src/main/java/com/autotune/database/table/lm/JobMetaData.java b/src/main/java/com/autotune/database/table/lm/JobMetaData.java new file mode 100644 index 000000000..9745c1fe5 --- /dev/null +++ b/src/main/java/com/autotune/database/table/lm/JobMetaData.java @@ -0,0 +1,29 @@ +package com.autotune.database.table.lm; + +import jakarta.persistence.*; + +@Entity +@Table(name = "kruize_jobmetadata") +public class JobMetaData { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @ManyToOne + @JoinColumn(name = "job_id", nullable = false) + private Jobs job; + + @Column(name = "experiment_name") + private String experimentName; + + @Column(columnDefinition = "jsonb") + private String notification; + + @Column(name = "recommendation_status") + private String recommendationsStatus; + + @Column(columnDefinition = "jsonb" , name="recommendation_notifications") + private String recommendationsNotifications; + + // Getters and Setters +} \ No newline at end of file diff --git a/src/main/java/com/autotune/database/table/lm/Jobs.java b/src/main/java/com/autotune/database/table/lm/Jobs.java new file mode 100644 index 000000000..13f887be3 --- /dev/null +++ b/src/main/java/com/autotune/database/table/lm/Jobs.java @@ -0,0 +1,35 @@ +package com.autotune.database.table.lm; + +import jakarta.persistence.*; + +import java.sql.Timestamp; +import java.util.List; +import java.util.UUID; + + +@Entity +@Table(name = "kruize_jobs") +public class Jobs { + @Id + @Column(name = "job_id") + private UUID jobId; + private String status; + @Column(name = "total_count") + private int totalExperiments; + @Column(name = "processed_count") + private int processedExperiments; + @Column(name = "start_time") + private Timestamp jobStartTime; + @Column(name = "end_time") + private Timestamp jobEndTime; + private String webhook; + + @Column(columnDefinition = "jsonb") + private String notifications; // Stored as JSON string + + @OneToMany(mappedBy = "job", cascade = CascadeType.ALL, fetch = FetchType.LAZY) + @Column(name = "meta_data") + private List jobMetaData; + + // Getters and Setters +} \ No newline at end of file diff --git a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java index d9c01fa7f..14f350b6a 100644 --- a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java +++ b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java @@ -66,7 +66,7 @@ public class KruizeDeploymentInfo { public static String database_admin_username; public static String database_admin_password; public static String database_ssl_mode; - + public static boolean cacheJobInMemory = true; public static String cloudwatch_logs_access_key_id; public static String cloudwatch_logs_secret_access_key; public static String cloudwatch_logs_log_group; From a627ed70afcefd024872fec00ddde70cb7c7ddf5 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Wed, 8 Jan 2025 16:12:32 +0530 Subject: [PATCH 2/5] 1 - BulkJobsave - table sql added EOL Signed-off-by: msvinaykumar --- src/main/java/com/autotune/database/table/lm/JobMetaData.java | 2 +- src/main/java/com/autotune/database/table/lm/Jobs.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/autotune/database/table/lm/JobMetaData.java b/src/main/java/com/autotune/database/table/lm/JobMetaData.java index 9745c1fe5..3beb863f3 100644 --- a/src/main/java/com/autotune/database/table/lm/JobMetaData.java +++ b/src/main/java/com/autotune/database/table/lm/JobMetaData.java @@ -26,4 +26,4 @@ public class JobMetaData { private String recommendationsNotifications; // Getters and Setters -} \ No newline at end of file +} diff --git a/src/main/java/com/autotune/database/table/lm/Jobs.java b/src/main/java/com/autotune/database/table/lm/Jobs.java index 13f887be3..b58e48048 100644 --- a/src/main/java/com/autotune/database/table/lm/Jobs.java +++ b/src/main/java/com/autotune/database/table/lm/Jobs.java @@ -32,4 +32,4 @@ public class Jobs { private List jobMetaData; // Getters and Setters -} \ No newline at end of file +} From e2ff46fd9e43204dc3f9b3c390503d921449221d Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Thu, 9 Jan 2025 18:44:15 +0530 Subject: [PATCH 3/5] 1 - BulkJobsave - table sql updated Signed-off-by: msvinaykumar --- design/KruizeDatabaseDesign.md | 73 ++++++++++++++----- migrations/kruize_local_ddl.sql | 4 +- .../com/autotune/database/dao/BulkJobDAO.java | 10 +++ .../autotune/database/dao/BulkJobDAOImpl.java | 4 + .../database/init/KruizeHibernateUtil.java | 6 +- .../table/lm/{Jobs.java => BulkJob.java} | 10 +-- .../database/table/lm/JobMetaData.java | 29 -------- 7 files changed, 74 insertions(+), 62 deletions(-) create mode 100644 src/main/java/com/autotune/database/dao/BulkJobDAO.java create mode 100644 src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java rename src/main/java/com/autotune/database/table/lm/{Jobs.java => BulkJob.java} (74%) delete mode 100644 src/main/java/com/autotune/database/table/lm/JobMetaData.java diff --git a/design/KruizeDatabaseDesign.md b/design/KruizeDatabaseDesign.md index 9254c072b..9addf510a 100644 --- a/design/KruizeDatabaseDesign.md +++ b/design/KruizeDatabaseDesign.md @@ -822,17 +822,19 @@ curl --location --request POST 'http://127.0.0.1:8080/createPerformanceProfile' insert into kruize_performance_profiles; ``` -## **kruize_jobs** +## **kruize_bulkjobs** --- -This table stores job-level data, including information such as job status, start and end times, notification details, total and processed counts. +This table stores job-level data, including information such as job status, start and end times, notification details, experiments details total and processed counts. + ```sql -CREATE TABLE kruize_jobs ( +CREATE TABLE kruize_bulkjobs ( job_id UUID NOT NULL, end_time TIMESTAMP(6), start_time TIMESTAMP(6), notifications JSONB, + experiments JSONB, processed_count INTEGER, status VARCHAR(255), total_count INTEGER, @@ -840,24 +842,55 @@ CREATE TABLE kruize_jobs ( PRIMARY KEY (job_id) ); ``` +Sample data +```json +{ + "status": "COMPLETED", + "total_experiments": 686, + "processed_experiments": 686, + "notifications": null, + "experiments": { + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28902795(job)|collect-profiles": { + "notification": null, + "recommendations": { + "status": "PROCESSED", + "notifications": null + } + }, + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28908435(job)|collect-profiles": { + "notification": null, + "recommendations": { + "status": "PROCESSED", + "notifications": null + } + }, + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28904820(job)|collect-profiles": { + "notification": null, + "recommendations": { + "status": "PROCESSED", + "notifications": null + } + }}, + "webhook": null, + "job_id": "3d14daf3-0f27-4848-8f5e-d9e890c5730e", + "job_start_time": "2024-12-19T06:28:11.536Z", + "job_end_time": "2024-12-19T06:30:27.764Z" +} +``` +When handling an "experiments" column with a large JSON field being updated by multiple threads, the primary considerations are ensuring concurrency, minimizing contention, and optimizing performance. This can be achieved by: -## **kruize_jobmetadata** - ---- -This table stores metadata for individual experiments associated with a job. It uses hash-based partitioning on job_id for scalability and performance. -```sql -CREATE TABLE kruize_jobmetadata ( - id BIGSERIAL NOT NULL, - experiment_name VARCHAR(255), - notification JSONB, - recommendation_notifications JSONB, - recommendation_status VARCHAR(255), - job_id UUID NOT NULL, - PRIMARY KEY (job_id, experiment_name) -) PARTITION BY HASH (job_id); +Optimizing Updates: +Partial Updates: + Update only the specific fields within the JSON, rather than replacing the entire document. The jsonb_set() function can be used for partial updates. +Batch Updates: + Group multiple updates into a single transaction to reduce overhead and minimize contention. -ALTER TABLE IF EXISTS kruize_jobmetadata - ADD CONSTRAINT bulkJobMetaDataConstraint - FOREIGN KEY (job_id) REFERENCES kruize_jobs; +Note: This approach is particularly relevant to PostgreSQL databases. +**Example:** +Let's say we want to update a part of the experiments field, for example, changing the value of the recommendations.status field of a specific experiment. +```sql +UPDATE kruize_bulkjobs +SET experiments = jsonb_set(experiments, '{prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28902795(job)|collect-profiles,recommendations,status}', '"NEW_STATUS"') +WHERE job_id = '3d14daf3-0f27-4848-8f5e-d9e890c5730e'; ``` \ No newline at end of file diff --git a/migrations/kruize_local_ddl.sql b/migrations/kruize_local_ddl.sql index b3d7ea17a..abab46645 100644 --- a/migrations/kruize_local_ddl.sql +++ b/migrations/kruize_local_ddl.sql @@ -6,9 +6,7 @@ alter table kruize_lm_experiments add column metadata_id bigint references krui alter table if exists kruize_lm_experiments add constraint UK_lm_experiment_name unique (experiment_name); create table IF NOT EXISTS kruize_metric_profiles (api_version varchar(255), kind varchar(255), metadata jsonb, name varchar(255) not null, k8s_type varchar(255), profile_version float(53) not null, slo jsonb, primary key (name)); create table IF NOT EXISTS kruize_lm_recommendations (interval_end_time timestamp(6) not null, experiment_name varchar(255) not null, cluster_name varchar(255), extended_data jsonb, version varchar(255),experiment_type varchar(255), primary key (experiment_name, interval_end_time)) PARTITION BY RANGE (interval_end_time); -create table IF NOT EXISTS kruize_jobs (job_id uuid not null, end_time timestamp(6), start_time timestamp(6), notifications jsonb, processed_count integer, status varchar(255), total_count integer, webhook varchar(255), primary key (job_id)); -create table IF NOT EXISTS kruize_jobmetadata (id bigserial not null, experiment_name varchar(255), notification jsonb, recommendation_notifications jsonb, recommendation_status varchar(255), job_id uuid not null, primary key (job_id, experiment_name)) PARTITION BY HASH (job_id); -alter table if exists kruize_jobmetadata add constraint bulkJobMetaDataConstraint foreign key (job_id) references kruize_jobs; +create table IF NOT EXISTS kruize_bulkjobs (job_id uuid not null, end_time timestamp(6), start_time timestamp(6), notifications jsonb,experiments jsonb, processed_count integer, status varchar(255), total_count integer, webhook varchar(255), primary key (job_id)); diff --git a/src/main/java/com/autotune/database/dao/BulkJobDAO.java b/src/main/java/com/autotune/database/dao/BulkJobDAO.java new file mode 100644 index 000000000..6c9941d4e --- /dev/null +++ b/src/main/java/com/autotune/database/dao/BulkJobDAO.java @@ -0,0 +1,10 @@ +package com.autotune.database.dao; + +import com.autotune.database.table.lm.Jobs; + +import java.util.UUID; + +public interface BulkJobDAO { + void saveJob(Jobs job); + Jobs getJobById(UUID jobId); +} \ No newline at end of file diff --git a/src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java b/src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java new file mode 100644 index 000000000..76f8a8c87 --- /dev/null +++ b/src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java @@ -0,0 +1,4 @@ +package com.autotune.database.dao; + +public class BulkJobDAOImpl { +} diff --git a/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java b/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java index e3c2f6b09..27940194a 100644 --- a/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java +++ b/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java @@ -17,8 +17,7 @@ import com.autotune.database.table.*; -import com.autotune.database.table.lm.Jobs; -import com.autotune.database.table.lm.JobMetaData; +import com.autotune.database.table.lm.BulkJob; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; import com.autotune.operator.KruizeDeploymentInfo; @@ -67,8 +66,7 @@ public static void buildSessionFactory() { configuration.addAnnotatedClass(KruizeDSMetadataEntry.class); configuration.addAnnotatedClass(KruizeMetricProfileEntry.class); configuration.addAnnotatedClass(KruizeAuthenticationEntry.class); - configuration.addAnnotatedClass(Jobs.class); - configuration.addAnnotatedClass(JobMetaData.class); + configuration.addAnnotatedClass(BulkJob.class); } LOGGER.info("DB is trying to connect to {}", connectionURL); sfTemp = configuration.buildSessionFactory(); diff --git a/src/main/java/com/autotune/database/table/lm/Jobs.java b/src/main/java/com/autotune/database/table/lm/BulkJob.java similarity index 74% rename from src/main/java/com/autotune/database/table/lm/Jobs.java rename to src/main/java/com/autotune/database/table/lm/BulkJob.java index b58e48048..3faa02900 100644 --- a/src/main/java/com/autotune/database/table/lm/Jobs.java +++ b/src/main/java/com/autotune/database/table/lm/BulkJob.java @@ -3,13 +3,12 @@ import jakarta.persistence.*; import java.sql.Timestamp; -import java.util.List; import java.util.UUID; @Entity -@Table(name = "kruize_jobs") -public class Jobs { +@Table(name = "kruize_bulkjobs") +public class BulkJob { @Id @Column(name = "job_id") private UUID jobId; @@ -27,9 +26,8 @@ public class Jobs { @Column(columnDefinition = "jsonb") private String notifications; // Stored as JSON string - @OneToMany(mappedBy = "job", cascade = CascadeType.ALL, fetch = FetchType.LAZY) - @Column(name = "meta_data") - private List jobMetaData; + @Column(columnDefinition = "jsonb") + private String experiments; // JSONB field for experiments data // Getters and Setters } diff --git a/src/main/java/com/autotune/database/table/lm/JobMetaData.java b/src/main/java/com/autotune/database/table/lm/JobMetaData.java deleted file mode 100644 index 3beb863f3..000000000 --- a/src/main/java/com/autotune/database/table/lm/JobMetaData.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.autotune.database.table.lm; - -import jakarta.persistence.*; - -@Entity -@Table(name = "kruize_jobmetadata") -public class JobMetaData { - @Id - @GeneratedValue(strategy = GenerationType.IDENTITY) - private Long id; - - @ManyToOne - @JoinColumn(name = "job_id", nullable = false) - private Jobs job; - - @Column(name = "experiment_name") - private String experimentName; - - @Column(columnDefinition = "jsonb") - private String notification; - - @Column(name = "recommendation_status") - private String recommendationsStatus; - - @Column(columnDefinition = "jsonb" , name="recommendation_notifications") - private String recommendationsNotifications; - - // Getters and Setters -} From 491d9abae2484028e3d625d97a88e6e185062fd4 Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Thu, 9 Jan 2025 18:49:56 +0530 Subject: [PATCH 4/5] 1 - BulkJobsave - table sql md file updated Signed-off-by: msvinaykumar --- design/KruizeDatabaseDesign.md | 1 - .../java/com/autotune/database/dao/BulkJobDAO.java | 10 ---------- .../java/com/autotune/database/dao/BulkJobDAOImpl.java | 4 ---- 3 files changed, 15 deletions(-) delete mode 100644 src/main/java/com/autotune/database/dao/BulkJobDAO.java delete mode 100644 src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java diff --git a/design/KruizeDatabaseDesign.md b/design/KruizeDatabaseDesign.md index 9addf510a..8c6265f86 100644 --- a/design/KruizeDatabaseDesign.md +++ b/design/KruizeDatabaseDesign.md @@ -20,7 +20,6 @@ The Kruize Autotune project has the following entities: 3. kruize_recommendations 4. kruize_performance_profiles 5. kruize_jobs -6. kruize_jobmetadata ## **kruize_experiments** diff --git a/src/main/java/com/autotune/database/dao/BulkJobDAO.java b/src/main/java/com/autotune/database/dao/BulkJobDAO.java deleted file mode 100644 index 6c9941d4e..000000000 --- a/src/main/java/com/autotune/database/dao/BulkJobDAO.java +++ /dev/null @@ -1,10 +0,0 @@ -package com.autotune.database.dao; - -import com.autotune.database.table.lm.Jobs; - -import java.util.UUID; - -public interface BulkJobDAO { - void saveJob(Jobs job); - Jobs getJobById(UUID jobId); -} \ No newline at end of file diff --git a/src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java b/src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java deleted file mode 100644 index 76f8a8c87..000000000 --- a/src/main/java/com/autotune/database/dao/BulkJobDAOImpl.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.autotune.database.dao; - -public class BulkJobDAOImpl { -} From 1209dcd0e21697947ec4deb9a56cd54ea2fc07eb Mon Sep 17 00:00:00 2001 From: msvinaykumar Date: Fri, 10 Jan 2025 10:46:46 +0530 Subject: [PATCH 5/5] 2 - BulkJobsave - Bulk Job save , update and get dao and implementation Signed-off-by: msvinaykumar --- .../autotune/database/dao/ExperimentDAO.java | 11 ++ .../database/dao/ExperimentDAOImpl.java | 103 +++++++++++++++++- .../autotune/database/helper/DBConstants.java | 7 ++ .../com/autotune/utils/MetricsConfig.java | 23 ++-- 4 files changed, 134 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/autotune/database/dao/ExperimentDAO.java b/src/main/java/com/autotune/database/dao/ExperimentDAO.java index b8cc9c897..c37c6330b 100644 --- a/src/main/java/com/autotune/database/dao/ExperimentDAO.java +++ b/src/main/java/com/autotune/database/dao/ExperimentDAO.java @@ -5,6 +5,7 @@ import com.autotune.analyzer.utils.AnalyzerConstants; import com.autotune.common.data.ValidationOutputData; import com.autotune.database.table.*; +import com.autotune.database.table.lm.BulkJob; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; @@ -129,4 +130,14 @@ public interface ExperimentDAO { public ValidationOutputData deleteKruizeDSMetadataEntryByName(String dataSourceName); ValidationOutputData addAuthenticationDetailsToDB(KruizeAuthenticationEntry kruizeAuthenticationEntry); + + // save ,get, partial update and delete BulkJob data + ValidationOutputData bulkJobSave(BulkJob bulkJob); + + BulkJob findBulkJobById(String jobId) throws Exception; + + ValidationOutputData updateBulkJobByExperiment(String jobId, String experimentName, String notification, String recommendationJson) throws Exception; + + void deleteBulkJobByID(String jobId); + } diff --git a/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java b/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java index f4900e637..14acd3f67 100644 --- a/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java +++ b/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java @@ -24,6 +24,7 @@ import com.autotune.database.helper.DBConstants; import com.autotune.database.init.KruizeHibernateUtil; import com.autotune.database.table.*; +import com.autotune.database.table.lm.BulkJob; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; import com.autotune.utils.KruizeConstants; @@ -615,6 +616,106 @@ public ValidationOutputData addAuthenticationDetailsToDB(KruizeAuthenticationEnt return validationOutputData; } + @Override + public ValidationOutputData bulkJobSave(BulkJob bulkJob) { + ValidationOutputData validationOutputData = new ValidationOutputData(false, null, null); + Transaction tx = null; + String statusValue = "failure"; + Timer.Sample timerSaveBulkJobDB = Timer.start(MetricsConfig.meterRegistry()); + try { + try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) { + try { + tx = session.beginTransaction(); + session.saveOrUpdate(bulkJob); + tx.commit(); + // TODO: remove native sql query and transient + validationOutputData.setSuccess(true); + statusValue = "success"; + } catch (HibernateException e) { + LOGGER.error("Not able to save experiment due to {}", e.getMessage()); + if (tx != null) tx.rollback(); + e.printStackTrace(); + validationOutputData.setSuccess(false); + validationOutputData.setMessage(e.getMessage()); + //TODO: save error to API_ERROR_LOG + } + } + } catch (Exception e) { + LOGGER.debug("Bulk JOB Save ={}", bulkJob); + LOGGER.error("Not able to save BulkJob due to {}", e.getMessage()); + validationOutputData.setMessage(e.getMessage()); + } finally { + if (null != timerSaveBulkJobDB) { + MetricsConfig.timerSaveBulkJobDB = MetricsConfig.timerBSaveBulkJobDB.tag("status", statusValue).register(MetricsConfig.meterRegistry()); + timerSaveBulkJobDB.stop(MetricsConfig.timerSaveBulkJobDB); + } + } + return validationOutputData; + } + + @Override + public BulkJob findBulkJobById(String jobId) throws Exception { + BulkJob bulkJob = null; + String statusValue = "failure"; + Timer.Sample timerGetBulkJobDB = Timer.start(MetricsConfig.meterRegistry()); + try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) { + bulkJob = session.createQuery(DBConstants.SQLQUERY.SELECT_FROM_BULKJOBS_BY_JOB_ID, BulkJob.class) + .setParameter("jobId", jobId).getSingleResult(); + statusValue = "success"; + } catch (Exception e) { + LOGGER.error("Not able to load bulk JOB {} due to {}", jobId, e.getMessage()); + throw new Exception("Error while loading BulkJob from database due to : " + e.getMessage()); + } finally { + if (null != timerGetBulkJobDB) { + MetricsConfig.timerLoadBulkJobId = MetricsConfig.timerBLoadBulkJobId.tag("status", statusValue).register(MetricsConfig.meterRegistry()); + timerGetBulkJobDB.stop(MetricsConfig.timerLoadExpName); + } + } + return bulkJob; + } + + @Override + public ValidationOutputData updateBulkJobByExperiment(String jobId, String experimentName, String notification, String recommendationJson) throws Exception { + ValidationOutputData validationOutputData = new ValidationOutputData(false, null, null); + String statusValue = "failure"; + Timer.Sample timerGetBulkJobDB = Timer.start(MetricsConfig.meterRegistry()); + try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) { + // Construct JSON paths for notification and recommendations fields + String notificationPath = "{experiments,\"" + experimentName + "\",notification}"; + String recommendationPath = "{experiments,\"" + experimentName + "\",recommendations}"; + + // Native SQL query using jsonb_set for partial updates + String sql = UPDATE_BULKJOB_BY_ID; + + // Execute the query + session.createNativeQuery(sql) + .setParameter("notificationPath", notificationPath) + .setParameter("newNotification", notification == null ? "null" : "\"" + notification + "\"") // Handle null value + .setParameter("recommendationPath", recommendationPath) + .setParameter("newRecommendation", recommendationJson) + .setParameter("jobId", jobId) + .executeUpdate(); + validationOutputData.setSuccess(true); + statusValue = "success"; + } catch (Exception e) { + LOGGER.error("Not able to load bulk JOB {} due to {}", jobId, e.getMessage()); + validationOutputData.setMessage(e.getMessage()); + throw new Exception("Error while loading BulkJob from database due to : " + e.getMessage()); + } finally { + if (null != timerGetBulkJobDB) { + MetricsConfig.timerUpdateBulkJobId = MetricsConfig.timerBUpdateBulkJobId.tag("status", statusValue).register(MetricsConfig.meterRegistry()); + timerGetBulkJobDB.stop(MetricsConfig.timerLoadExpName); + } + } + return validationOutputData; + } + + @Override + public void deleteBulkJobByID(String jobId) { + //todo + } + + @Override public boolean updateExperimentStatus(KruizeObject kruizeObject, AnalyzerConstants.ExperimentStatus status) { kruizeObject.setStatus(status); @@ -793,7 +894,7 @@ public List loadAllLMExperiments() throws Exception { return entries; } - + @Override public List loadAllResults() throws Exception { // TODO: load only experimentStatus=inProgress , playback may not require completed experiments diff --git a/src/main/java/com/autotune/database/helper/DBConstants.java b/src/main/java/com/autotune/database/helper/DBConstants.java index fdcacc163..18d2522e8 100644 --- a/src/main/java/com/autotune/database/helper/DBConstants.java +++ b/src/main/java/com/autotune/database/helper/DBConstants.java @@ -9,6 +9,13 @@ public static final class SQLQUERY { public static final String SELECT_FROM_LM_EXPERIMENTS = "from KruizeLMExperimentEntry"; public static final String SELECT_FROM_EXPERIMENTS_BY_EXP_NAME = "from KruizeExperimentEntry k WHERE k.experiment_name = :experimentName"; public static final String SELECT_FROM_LM_EXPERIMENTS_BY_EXP_NAME = "from KruizeLMExperimentEntry k WHERE k.experiment_name = :experimentName"; + public static final String SELECT_FROM_BULKJOBS_BY_JOB_ID = "from BulkJob k WHERE k.jobId = :jobId"; + public static final String UPDATE_BULKJOB_BY_ID = "UPDATE kruize_bulkjobs " + + "SET experiments = jsonb_set(" + + " jsonb_set(experiments, :notificationPath, :newNotification::jsonb, true), " + + " :recommendationPath, :newRecommendation::jsonb, true" + + ") " + + "WHERE job_id = :jobId"; public static final String SELECT_FROM_RESULTS = "from KruizeResultsEntry"; public static final String SELECT_FROM_RESULTS_BY_EXP_NAME = "from KruizeResultsEntry k WHERE k.experiment_name = :experimentName"; public static final String SELECT_FROM_DATASOURCE = "from KruizeDataSourceEntry"; diff --git a/src/main/java/com/autotune/utils/MetricsConfig.java b/src/main/java/com/autotune/utils/MetricsConfig.java index b7afa12f7..5947f82cc 100644 --- a/src/main/java/com/autotune/utils/MetricsConfig.java +++ b/src/main/java/com/autotune/utils/MetricsConfig.java @@ -2,7 +2,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; @@ -15,19 +14,20 @@ import java.util.concurrent.atomic.AtomicInteger; public class MetricsConfig { - + + public static final AtomicInteger activeJobs = new AtomicInteger(0); public static Timer timerListRec, timerListExp, timerCreateExp, timerUpdateResults, timerUpdateRecomendations; - public static Timer timerLoadRecExpName, timerLoadResultsExpName, timerLoadExpName, timerLoadRecExpNameDate, timerBoxPlots; + public static Timer timerLoadRecExpName, timerLoadResultsExpName, timerLoadExpName, timerLoadBulkJobId, timerUpdateBulkJobId, timerLoadRecExpNameDate, timerBoxPlots; public static Timer timerLoadAllRec, timerLoadAllExp, timerLoadAllResults; - public static Timer timerAddRecDB, timerAddResultsDB, timerAddExpDB, timerAddBulkResultsDB; + public static Timer timerAddRecDB, timerAddResultsDB, timerAddExpDB, timerSaveBulkJobDB, timerAddBulkResultsDB, timerAddBulkJob; public static Timer timerAddPerfProfileDB, timerLoadPerfProfileName, timerLoadAllPerfProfiles; public static Timer timerImportMetadata, timerGetMetadata; public static Timer timerJobStatus, timerCreateBulkJob, timerGetExpMap, timerCreateBulkExp, timerGenerateBulkRec, timerRunJob; - public static Counter timerKruizeNotifications , timerBulkJobs; + public static Counter timerKruizeNotifications, timerBulkJobs; public static Timer.Builder timerBListRec, timerBListExp, timerBCreateExp, timerBUpdateResults, timerBUpdateRecommendations; public static Timer.Builder timerBLoadRecExpName, timerBLoadResultsExpName, timerBLoadExpName, timerBLoadRecExpNameDate, timerBBoxPlots; public static Timer.Builder timerBLoadAllRec, timerBLoadAllExp, timerBLoadAllResults; - public static Timer.Builder timerBAddRecDB, timerBAddResultsDB, timerBAddExpDB, timerBAddBulkResultsDB; + public static Timer.Builder timerBAddRecDB, timerBAddResultsDB, timerBAddExpDB, timerBLoadBulkJobId, timerBUpdateBulkJobId, timerBSaveBulkJobDB, timerBaddBulkJob, timerBAddBulkResultsDB; public static Timer.Builder timerBAddPerfProfileDB, timerBLoadPerfProfileName, timerBLoadAllPerfProfiles; public static Counter.Builder timerBKruizeNotifications, timerBBulkJobs; public static PrometheusMeterRegistry meterRegistry; @@ -35,12 +35,11 @@ public class MetricsConfig { public static Timer.Builder timerBListDS, timerBImportDSMetadata, timerBListDSMetadata; public static Timer.Builder timerBImportMetadata, timerBGetMetadata; public static Timer.Builder timerBJobStatus, timerBCreateBulkJob, timerBGetExpMap, timerBCreateBulkExp, timerBGenerateBulkRec, timerBRunJob; + public static Gauge.Builder timerBBulkRunJobs; private static MetricsConfig INSTANCE; public String API_METRIC_DESC = "Time taken for Kruize APIs"; public String DB_METRIC_DESC = "Time taken for KruizeDB methods"; public String METHOD_METRIC_DESC = "Time taken for Kruize methods"; - public static final AtomicInteger activeJobs = new AtomicInteger(0); - public static Gauge.Builder timerBBulkRunJobs; private MetricsConfig() { meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); @@ -63,6 +62,10 @@ private MetricsConfig() { timerBAddResultsDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addResultToDB"); timerBAddBulkResultsDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addBulkResultsToDBAndFetchFailedResults"); timerBAddExpDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addExperimentToDB"); + timerBSaveBulkJobDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "saveBulkJobDB"); + timerBLoadBulkJobId = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "getBulkJobDB"); + timerBUpdateBulkJobId = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "updateBulkJobDB"); + timerBAddPerfProfileDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addPerformanceProfileToDB"); timerBLoadPerfProfileName = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "loadPerformanceProfileByName"); timerBLoadAllPerfProfiles = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "loadAllPerformanceProfiles"); @@ -73,13 +76,15 @@ private MetricsConfig() { timerBListDSMetadata = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "dsmetadata").tag("method", "GET"); timerBKruizeNotifications = Counter.builder("KruizeNotifications").description("Kruize notifications").tag("api", "updateRecommendations"); + timerBaddBulkJob = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addBulkJobToDB"); + timerBImportMetadata = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "datasources").tag("method", "importMetadata"); timerBGetMetadata = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "datasources").tag("method", "getMetadata"); timerBJobStatus = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "jobStatus"); timerBCreateBulkJob = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "createBulkJob"); timerBGetExpMap = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "getExperimentMap"); timerBRunJob = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "runBulkJob"); - timerBBulkRunJobs = Gauge.builder("kruizeAPI_active_jobs_count", activeJobs, AtomicInteger::get).description("No.of bulk jobs running").tags("api", "bulk", "method", "runBulkJob" , "status", "running"); + timerBBulkRunJobs = Gauge.builder("kruizeAPI_active_jobs_count", activeJobs, AtomicInteger::get).description("No.of bulk jobs running").tags("api", "bulk", "method", "runBulkJob", "status", "running"); timerBBulkRunJobs.register(meterRegistry); new ClassLoaderMetrics().bindTo(meterRegistry);