diff --git a/design/KruizeDatabaseDesign.md b/design/KruizeDatabaseDesign.md index 37a9c6fed..8c6265f86 100644 --- a/design/KruizeDatabaseDesign.md +++ b/design/KruizeDatabaseDesign.md @@ -19,6 +19,7 @@ The Kruize Autotune project has the following entities: 2. kruize_results 3. kruize_recommendations 4. kruize_performance_profiles +5. kruize_jobs ## **kruize_experiments** @@ -819,3 +820,76 @@ curl --location --request POST 'http://127.0.0.1:8080/createPerformanceProfile' ``` insert into kruize_performance_profiles; ``` + +## **kruize_bulkjobs** + +--- + +This table stores job-level data, including information such as job status, start and end times, notification details, experiments details total and processed counts. + +```sql +CREATE TABLE kruize_bulkjobs ( + job_id UUID NOT NULL, + end_time TIMESTAMP(6), + start_time TIMESTAMP(6), + notifications JSONB, + experiments JSONB, + processed_count INTEGER, + status VARCHAR(255), + total_count INTEGER, + webhook VARCHAR(255), + PRIMARY KEY (job_id) +); +``` +Sample data +```json +{ + "status": "COMPLETED", + "total_experiments": 686, + "processed_experiments": 686, + "notifications": null, + "experiments": { + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28902795(job)|collect-profiles": { + "notification": null, + "recommendations": { + "status": "PROCESSED", + "notifications": null + } + }, + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28908435(job)|collect-profiles": { + "notification": null, + "recommendations": { + "status": "PROCESSED", + "notifications": null + } + }, + "prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28904820(job)|collect-profiles": { + "notification": null, + "recommendations": { + "status": "PROCESSED", + "notifications": null + } + }}, + "webhook": null, + "job_id": "3d14daf3-0f27-4848-8f5e-d9e890c5730e", + "job_start_time": "2024-12-19T06:28:11.536Z", + "job_end_time": "2024-12-19T06:30:27.764Z" +} +``` +When handling an "experiments" column with a large JSON field being updated by multiple threads, the primary considerations are ensuring concurrency, minimizing contention, and optimizing performance. This can be achieved by: + +Optimizing Updates: +Partial Updates: + Update only the specific fields within the JSON, rather than replacing the entire document. The jsonb_set() function can be used for partial updates. +Batch Updates: + Group multiple updates into a single transaction to reduce overhead and minimize contention. + +Note: This approach is particularly relevant to PostgreSQL databases. + +**Example:** +Let's say we want to update a part of the experiments field, for example, changing the value of the recommendations.status field of a specific experiment. +```sql +UPDATE kruize_bulkjobs +SET experiments = jsonb_set(experiments, '{prometheus-1|default|openshift-operator-lifecycle-manager|collect-profiles-28902795(job)|collect-profiles,recommendations,status}', '"NEW_STATUS"') +WHERE job_id = '3d14daf3-0f27-4848-8f5e-d9e890c5730e'; +``` \ No newline at end of file diff --git a/migrations/kruize_local_ddl.sql b/migrations/kruize_local_ddl.sql index 645dfbab1..abab46645 100644 --- a/migrations/kruize_local_ddl.sql +++ b/migrations/kruize_local_ddl.sql @@ -6,3 +6,8 @@ alter table kruize_lm_experiments add column metadata_id bigint references krui alter table if exists kruize_lm_experiments add constraint UK_lm_experiment_name unique (experiment_name); create table IF NOT EXISTS kruize_metric_profiles (api_version varchar(255), kind varchar(255), metadata jsonb, name varchar(255) not null, k8s_type varchar(255), profile_version float(53) not null, slo jsonb, primary key (name)); create table IF NOT EXISTS kruize_lm_recommendations (interval_end_time timestamp(6) not null, experiment_name varchar(255) not null, cluster_name varchar(255), extended_data jsonb, version varchar(255),experiment_type varchar(255), primary key (experiment_name, interval_end_time)) PARTITION BY RANGE (interval_end_time); +create table IF NOT EXISTS kruize_bulkjobs (job_id uuid not null, end_time timestamp(6), start_time timestamp(6), notifications jsonb,experiments jsonb, processed_count integer, status varchar(255), total_count integer, webhook varchar(255), primary key (job_id)); + + + + diff --git a/src/main/java/com/autotune/analyzer/services/BulkService.java b/src/main/java/com/autotune/analyzer/services/BulkService.java index d22e7ac7d..43d5ce9e6 100644 --- a/src/main/java/com/autotune/analyzer/services/BulkService.java +++ b/src/main/java/com/autotune/analyzer/services/BulkService.java @@ -42,6 +42,7 @@ import java.util.concurrent.Executors; import static com.autotune.analyzer.utils.AnalyzerConstants.ServiceConstants.*; +import static com.autotune.operator.KruizeDeploymentInfo.cacheJobInMemory; import static com.autotune.utils.KruizeConstants.KRUIZE_BULK_API.*; /** @@ -153,7 +154,8 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) // Generate a unique jobID String jobID = UUID.randomUUID().toString(); BulkJobStatus jobStatus = new BulkJobStatus(jobID, IN_PROGRESS, Instant.now()); - jobStatusMap.put(jobID, jobStatus); + if(cacheJobInMemory) + jobStatusMap.put(jobID, jobStatus); // Submit the job to be processed asynchronously executorService.submit(new BulkJobManager(jobID, jobStatus, payload)); diff --git a/src/main/java/com/autotune/database/dao/ExperimentDAO.java b/src/main/java/com/autotune/database/dao/ExperimentDAO.java index b8cc9c897..c37c6330b 100644 --- a/src/main/java/com/autotune/database/dao/ExperimentDAO.java +++ b/src/main/java/com/autotune/database/dao/ExperimentDAO.java @@ -5,6 +5,7 @@ import com.autotune.analyzer.utils.AnalyzerConstants; import com.autotune.common.data.ValidationOutputData; import com.autotune.database.table.*; +import com.autotune.database.table.lm.BulkJob; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; @@ -129,4 +130,14 @@ public interface ExperimentDAO { public ValidationOutputData deleteKruizeDSMetadataEntryByName(String dataSourceName); ValidationOutputData addAuthenticationDetailsToDB(KruizeAuthenticationEntry kruizeAuthenticationEntry); + + // save ,get, partial update and delete BulkJob data + ValidationOutputData bulkJobSave(BulkJob bulkJob); + + BulkJob findBulkJobById(String jobId) throws Exception; + + ValidationOutputData updateBulkJobByExperiment(String jobId, String experimentName, String notification, String recommendationJson) throws Exception; + + void deleteBulkJobByID(String jobId); + } diff --git a/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java b/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java index f4900e637..14acd3f67 100644 --- a/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java +++ b/src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java @@ -24,6 +24,7 @@ import com.autotune.database.helper.DBConstants; import com.autotune.database.init.KruizeHibernateUtil; import com.autotune.database.table.*; +import com.autotune.database.table.lm.BulkJob; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; import com.autotune.utils.KruizeConstants; @@ -615,6 +616,106 @@ public ValidationOutputData addAuthenticationDetailsToDB(KruizeAuthenticationEnt return validationOutputData; } + @Override + public ValidationOutputData bulkJobSave(BulkJob bulkJob) { + ValidationOutputData validationOutputData = new ValidationOutputData(false, null, null); + Transaction tx = null; + String statusValue = "failure"; + Timer.Sample timerSaveBulkJobDB = Timer.start(MetricsConfig.meterRegistry()); + try { + try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) { + try { + tx = session.beginTransaction(); + session.saveOrUpdate(bulkJob); + tx.commit(); + // TODO: remove native sql query and transient + validationOutputData.setSuccess(true); + statusValue = "success"; + } catch (HibernateException e) { + LOGGER.error("Not able to save experiment due to {}", e.getMessage()); + if (tx != null) tx.rollback(); + e.printStackTrace(); + validationOutputData.setSuccess(false); + validationOutputData.setMessage(e.getMessage()); + //TODO: save error to API_ERROR_LOG + } + } + } catch (Exception e) { + LOGGER.debug("Bulk JOB Save ={}", bulkJob); + LOGGER.error("Not able to save BulkJob due to {}", e.getMessage()); + validationOutputData.setMessage(e.getMessage()); + } finally { + if (null != timerSaveBulkJobDB) { + MetricsConfig.timerSaveBulkJobDB = MetricsConfig.timerBSaveBulkJobDB.tag("status", statusValue).register(MetricsConfig.meterRegistry()); + timerSaveBulkJobDB.stop(MetricsConfig.timerSaveBulkJobDB); + } + } + return validationOutputData; + } + + @Override + public BulkJob findBulkJobById(String jobId) throws Exception { + BulkJob bulkJob = null; + String statusValue = "failure"; + Timer.Sample timerGetBulkJobDB = Timer.start(MetricsConfig.meterRegistry()); + try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) { + bulkJob = session.createQuery(DBConstants.SQLQUERY.SELECT_FROM_BULKJOBS_BY_JOB_ID, BulkJob.class) + .setParameter("jobId", jobId).getSingleResult(); + statusValue = "success"; + } catch (Exception e) { + LOGGER.error("Not able to load bulk JOB {} due to {}", jobId, e.getMessage()); + throw new Exception("Error while loading BulkJob from database due to : " + e.getMessage()); + } finally { + if (null != timerGetBulkJobDB) { + MetricsConfig.timerLoadBulkJobId = MetricsConfig.timerBLoadBulkJobId.tag("status", statusValue).register(MetricsConfig.meterRegistry()); + timerGetBulkJobDB.stop(MetricsConfig.timerLoadExpName); + } + } + return bulkJob; + } + + @Override + public ValidationOutputData updateBulkJobByExperiment(String jobId, String experimentName, String notification, String recommendationJson) throws Exception { + ValidationOutputData validationOutputData = new ValidationOutputData(false, null, null); + String statusValue = "failure"; + Timer.Sample timerGetBulkJobDB = Timer.start(MetricsConfig.meterRegistry()); + try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) { + // Construct JSON paths for notification and recommendations fields + String notificationPath = "{experiments,\"" + experimentName + "\",notification}"; + String recommendationPath = "{experiments,\"" + experimentName + "\",recommendations}"; + + // Native SQL query using jsonb_set for partial updates + String sql = UPDATE_BULKJOB_BY_ID; + + // Execute the query + session.createNativeQuery(sql) + .setParameter("notificationPath", notificationPath) + .setParameter("newNotification", notification == null ? "null" : "\"" + notification + "\"") // Handle null value + .setParameter("recommendationPath", recommendationPath) + .setParameter("newRecommendation", recommendationJson) + .setParameter("jobId", jobId) + .executeUpdate(); + validationOutputData.setSuccess(true); + statusValue = "success"; + } catch (Exception e) { + LOGGER.error("Not able to load bulk JOB {} due to {}", jobId, e.getMessage()); + validationOutputData.setMessage(e.getMessage()); + throw new Exception("Error while loading BulkJob from database due to : " + e.getMessage()); + } finally { + if (null != timerGetBulkJobDB) { + MetricsConfig.timerUpdateBulkJobId = MetricsConfig.timerBUpdateBulkJobId.tag("status", statusValue).register(MetricsConfig.meterRegistry()); + timerGetBulkJobDB.stop(MetricsConfig.timerLoadExpName); + } + } + return validationOutputData; + } + + @Override + public void deleteBulkJobByID(String jobId) { + //todo + } + + @Override public boolean updateExperimentStatus(KruizeObject kruizeObject, AnalyzerConstants.ExperimentStatus status) { kruizeObject.setStatus(status); @@ -793,7 +894,7 @@ public List loadAllLMExperiments() throws Exception { return entries; } - + @Override public List loadAllResults() throws Exception { // TODO: load only experimentStatus=inProgress , playback may not require completed experiments diff --git a/src/main/java/com/autotune/database/helper/DBConstants.java b/src/main/java/com/autotune/database/helper/DBConstants.java index fdcacc163..18d2522e8 100644 --- a/src/main/java/com/autotune/database/helper/DBConstants.java +++ b/src/main/java/com/autotune/database/helper/DBConstants.java @@ -9,6 +9,13 @@ public static final class SQLQUERY { public static final String SELECT_FROM_LM_EXPERIMENTS = "from KruizeLMExperimentEntry"; public static final String SELECT_FROM_EXPERIMENTS_BY_EXP_NAME = "from KruizeExperimentEntry k WHERE k.experiment_name = :experimentName"; public static final String SELECT_FROM_LM_EXPERIMENTS_BY_EXP_NAME = "from KruizeLMExperimentEntry k WHERE k.experiment_name = :experimentName"; + public static final String SELECT_FROM_BULKJOBS_BY_JOB_ID = "from BulkJob k WHERE k.jobId = :jobId"; + public static final String UPDATE_BULKJOB_BY_ID = "UPDATE kruize_bulkjobs " + + "SET experiments = jsonb_set(" + + " jsonb_set(experiments, :notificationPath, :newNotification::jsonb, true), " + + " :recommendationPath, :newRecommendation::jsonb, true" + + ") " + + "WHERE job_id = :jobId"; public static final String SELECT_FROM_RESULTS = "from KruizeResultsEntry"; public static final String SELECT_FROM_RESULTS_BY_EXP_NAME = "from KruizeResultsEntry k WHERE k.experiment_name = :experimentName"; public static final String SELECT_FROM_DATASOURCE = "from KruizeDataSourceEntry"; diff --git a/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java b/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java index 841ac99a9..27940194a 100644 --- a/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java +++ b/src/main/java/com/autotune/database/init/KruizeHibernateUtil.java @@ -17,6 +17,7 @@ import com.autotune.database.table.*; +import com.autotune.database.table.lm.BulkJob; import com.autotune.database.table.lm.KruizeLMExperimentEntry; import com.autotune.database.table.lm.KruizeLMRecommendationEntry; import com.autotune.operator.KruizeDeploymentInfo; @@ -65,6 +66,7 @@ public static void buildSessionFactory() { configuration.addAnnotatedClass(KruizeDSMetadataEntry.class); configuration.addAnnotatedClass(KruizeMetricProfileEntry.class); configuration.addAnnotatedClass(KruizeAuthenticationEntry.class); + configuration.addAnnotatedClass(BulkJob.class); } LOGGER.info("DB is trying to connect to {}", connectionURL); sfTemp = configuration.buildSessionFactory(); diff --git a/src/main/java/com/autotune/database/table/lm/BulkJob.java b/src/main/java/com/autotune/database/table/lm/BulkJob.java new file mode 100644 index 000000000..3faa02900 --- /dev/null +++ b/src/main/java/com/autotune/database/table/lm/BulkJob.java @@ -0,0 +1,33 @@ +package com.autotune.database.table.lm; + +import jakarta.persistence.*; + +import java.sql.Timestamp; +import java.util.UUID; + + +@Entity +@Table(name = "kruize_bulkjobs") +public class BulkJob { + @Id + @Column(name = "job_id") + private UUID jobId; + private String status; + @Column(name = "total_count") + private int totalExperiments; + @Column(name = "processed_count") + private int processedExperiments; + @Column(name = "start_time") + private Timestamp jobStartTime; + @Column(name = "end_time") + private Timestamp jobEndTime; + private String webhook; + + @Column(columnDefinition = "jsonb") + private String notifications; // Stored as JSON string + + @Column(columnDefinition = "jsonb") + private String experiments; // JSONB field for experiments data + + // Getters and Setters +} diff --git a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java index d9c01fa7f..14f350b6a 100644 --- a/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java +++ b/src/main/java/com/autotune/operator/KruizeDeploymentInfo.java @@ -66,7 +66,7 @@ public class KruizeDeploymentInfo { public static String database_admin_username; public static String database_admin_password; public static String database_ssl_mode; - + public static boolean cacheJobInMemory = true; public static String cloudwatch_logs_access_key_id; public static String cloudwatch_logs_secret_access_key; public static String cloudwatch_logs_log_group; diff --git a/src/main/java/com/autotune/utils/MetricsConfig.java b/src/main/java/com/autotune/utils/MetricsConfig.java index b7afa12f7..5947f82cc 100644 --- a/src/main/java/com/autotune/utils/MetricsConfig.java +++ b/src/main/java/com/autotune/utils/MetricsConfig.java @@ -2,7 +2,6 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; -import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; @@ -15,19 +14,20 @@ import java.util.concurrent.atomic.AtomicInteger; public class MetricsConfig { - + + public static final AtomicInteger activeJobs = new AtomicInteger(0); public static Timer timerListRec, timerListExp, timerCreateExp, timerUpdateResults, timerUpdateRecomendations; - public static Timer timerLoadRecExpName, timerLoadResultsExpName, timerLoadExpName, timerLoadRecExpNameDate, timerBoxPlots; + public static Timer timerLoadRecExpName, timerLoadResultsExpName, timerLoadExpName, timerLoadBulkJobId, timerUpdateBulkJobId, timerLoadRecExpNameDate, timerBoxPlots; public static Timer timerLoadAllRec, timerLoadAllExp, timerLoadAllResults; - public static Timer timerAddRecDB, timerAddResultsDB, timerAddExpDB, timerAddBulkResultsDB; + public static Timer timerAddRecDB, timerAddResultsDB, timerAddExpDB, timerSaveBulkJobDB, timerAddBulkResultsDB, timerAddBulkJob; public static Timer timerAddPerfProfileDB, timerLoadPerfProfileName, timerLoadAllPerfProfiles; public static Timer timerImportMetadata, timerGetMetadata; public static Timer timerJobStatus, timerCreateBulkJob, timerGetExpMap, timerCreateBulkExp, timerGenerateBulkRec, timerRunJob; - public static Counter timerKruizeNotifications , timerBulkJobs; + public static Counter timerKruizeNotifications, timerBulkJobs; public static Timer.Builder timerBListRec, timerBListExp, timerBCreateExp, timerBUpdateResults, timerBUpdateRecommendations; public static Timer.Builder timerBLoadRecExpName, timerBLoadResultsExpName, timerBLoadExpName, timerBLoadRecExpNameDate, timerBBoxPlots; public static Timer.Builder timerBLoadAllRec, timerBLoadAllExp, timerBLoadAllResults; - public static Timer.Builder timerBAddRecDB, timerBAddResultsDB, timerBAddExpDB, timerBAddBulkResultsDB; + public static Timer.Builder timerBAddRecDB, timerBAddResultsDB, timerBAddExpDB, timerBLoadBulkJobId, timerBUpdateBulkJobId, timerBSaveBulkJobDB, timerBaddBulkJob, timerBAddBulkResultsDB; public static Timer.Builder timerBAddPerfProfileDB, timerBLoadPerfProfileName, timerBLoadAllPerfProfiles; public static Counter.Builder timerBKruizeNotifications, timerBBulkJobs; public static PrometheusMeterRegistry meterRegistry; @@ -35,12 +35,11 @@ public class MetricsConfig { public static Timer.Builder timerBListDS, timerBImportDSMetadata, timerBListDSMetadata; public static Timer.Builder timerBImportMetadata, timerBGetMetadata; public static Timer.Builder timerBJobStatus, timerBCreateBulkJob, timerBGetExpMap, timerBCreateBulkExp, timerBGenerateBulkRec, timerBRunJob; + public static Gauge.Builder timerBBulkRunJobs; private static MetricsConfig INSTANCE; public String API_METRIC_DESC = "Time taken for Kruize APIs"; public String DB_METRIC_DESC = "Time taken for KruizeDB methods"; public String METHOD_METRIC_DESC = "Time taken for Kruize methods"; - public static final AtomicInteger activeJobs = new AtomicInteger(0); - public static Gauge.Builder timerBBulkRunJobs; private MetricsConfig() { meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); @@ -63,6 +62,10 @@ private MetricsConfig() { timerBAddResultsDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addResultToDB"); timerBAddBulkResultsDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addBulkResultsToDBAndFetchFailedResults"); timerBAddExpDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addExperimentToDB"); + timerBSaveBulkJobDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "saveBulkJobDB"); + timerBLoadBulkJobId = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "getBulkJobDB"); + timerBUpdateBulkJobId = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "updateBulkJobDB"); + timerBAddPerfProfileDB = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addPerformanceProfileToDB"); timerBLoadPerfProfileName = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "loadPerformanceProfileByName"); timerBLoadAllPerfProfiles = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "loadAllPerformanceProfiles"); @@ -73,13 +76,15 @@ private MetricsConfig() { timerBListDSMetadata = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "dsmetadata").tag("method", "GET"); timerBKruizeNotifications = Counter.builder("KruizeNotifications").description("Kruize notifications").tag("api", "updateRecommendations"); + timerBaddBulkJob = Timer.builder("kruizeDB").description(DB_METRIC_DESC).tag("method", "addBulkJobToDB"); + timerBImportMetadata = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "datasources").tag("method", "importMetadata"); timerBGetMetadata = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "datasources").tag("method", "getMetadata"); timerBJobStatus = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "jobStatus"); timerBCreateBulkJob = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "createBulkJob"); timerBGetExpMap = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "getExperimentMap"); timerBRunJob = Timer.builder("kruizeAPI").description(API_METRIC_DESC).tag("api", "bulk").tag("method", "runBulkJob"); - timerBBulkRunJobs = Gauge.builder("kruizeAPI_active_jobs_count", activeJobs, AtomicInteger::get).description("No.of bulk jobs running").tags("api", "bulk", "method", "runBulkJob" , "status", "running"); + timerBBulkRunJobs = Gauge.builder("kruizeAPI_active_jobs_count", activeJobs, AtomicInteger::get).description("No.of bulk jobs running").tags("api", "bulk", "method", "runBulkJob", "status", "running"); timerBBulkRunJobs.register(meterRegistry); new ClassLoaderMetrics().bindTo(meterRegistry);