diff --git a/config/development.yaml b/config/development.yaml index 91283e6..2149923 100644 --- a/config/development.yaml +++ b/config/development.yaml @@ -14,3 +14,5 @@ validate: # HARE concept id: - '2000007027' catch_all_cohort_id: 4 +worker_pool_size: 5 +batch_size: 50 diff --git a/controllers/cohortdata.go b/controllers/cohortdata.go index 76e8015..3ed4c38 100644 --- a/controllers/cohortdata.go +++ b/controllers/cohortdata.go @@ -370,3 +370,9 @@ func (u CohortDataController) RetrieveDataDictionary(c *gin.Context) { } } + +func (u CohortDataController) GenerateDataDictionary(c *gin.Context) { + log.Printf("Generating Data Dictionary...") + go u.dataDictionaryModel.GenerateDataDictionary() + c.JSON(http.StatusOK, "Data Dictionary Kicked Off") +} diff --git a/main.go b/main.go index fe4e964..b0df331 100644 --- a/main.go +++ b/main.go @@ -21,23 +21,11 @@ func runDataValidation() { } } -func runDataDictionaryGeneration() { - var cohortDataModel = new(models.CohortData) - var dataDictionaryModel = new(models.DataDictionary) - dataDictionaryModel.CohortDataModel = cohortDataModel - log.Printf("Generating Data Dictionary...") - _, error := dataDictionaryModel.GenerateDataDictionary() - if error != nil { - log.Printf("Error: Data Dictionary Generation Failed! Gorm error %v", error) - } -} - func main() { environment := flag.String("e", "development", "Environment/prefix of config file name") flag.Parse() config.Init(*environment) db.Init() runDataValidation() - go runDataDictionaryGeneration() server.Init() } diff --git a/models/cohortdata.go b/models/cohortdata.go index 5c4e88a..15aa331 100644 --- a/models/cohortdata.go +++ b/models/cohortdata.go @@ -98,9 +98,8 @@ func (h CohortData) RetrieveDataBySourceIdAndCohortIdAndConceptIdsOrderedByPerso func (h CohortData) RetrieveHistogramDataBySourceIdAndCohortIdAndConceptIdsAndCohortPairs(sourceId int, cohortDefinitionId int, histogramConceptId int64, filterConceptIds []int64, filterCohortPairs []utils.CustomDichotomousVariableDef) ([]*PersonConceptAndValue, error) { var dataSourceModel = new(Source) omopDataSource := dataSourceModel.GetDataSource(sourceId, Omop) - resultsDataSource := dataSourceModel.GetDataSource(sourceId, Results) - log.Printf("Got All data sources") + // get the observations for the subjects and the concepts, to build up the data rows to return: var cohortData []*PersonConceptAndValue query := QueryFilterByCohortPairsHelper(filterCohortPairs, resultsDataSource, cohortDefinitionId, "unionAndIntersect"). diff --git a/models/datadictionary.go b/models/datadictionary.go index 3099307..492f50a 100644 --- a/models/datadictionary.go +++ b/models/datadictionary.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "log" + "sync" "time" "github.com/uc-cdis/cohort-middleware/config" @@ -11,12 +12,11 @@ import ( ) type DataDictionaryI interface { - GenerateDataDictionary() (*DataDictionaryModel, error) + GenerateDataDictionary() GetDataDictionary() (*DataDictionaryModel, error) } type DataDictionary struct { - CohortDataModel CohortDataI } type DataDictionaryModel struct { @@ -41,23 +41,94 @@ type DataDictionaryEntry struct { ValueSummary json.RawMessage `json:"valueSummary"` } -var dataDictionaryResult *DataDictionaryModel = nil +type DataDictionaryResult struct { + VocabularyID string `json:"vocabularyID"` + ConceptID int64 `json:"conceptID"` + ConceptCode string `json:"conceptCode"` + ConceptName string `json:"conceptName"` + ConceptClassId string `json:"conceptClassID"` + NumberOfPeopleWithVariable int64 `json:"numberOfPeopleWithVariable"` + NumberOfPeopleWhereValueIsFilled int64 `json:"numberOfPeopleWhereValueIsFilled"` + NumberOfPeopleWhereValueIsNull int64 `json:"numberOfPeopleWhereValueIsNull"` + ValueStoredAs string `json:"valueStoredAs"` + MinValue float64 `json:"minValue"` + MaxValue float64 `json:"maxValue"` + MeanValue float64 `json:"meanValue"` + StandardDeviation float64 `json:"standardDeviation"` + ValueSummary json.RawMessage `json:"valueSummary"` +} + +var ResultCache *DataDictionaryModel = nil func (u DataDictionary) GetDataDictionary() (*DataDictionaryModel, error) { //Read from cache - if dataDictionaryResult != nil { - return dataDictionaryResult, nil + if ResultCache != nil { + return ResultCache, nil } else { - return nil, errors.New("data dictionary is not available yet") + //Read from DB + var source = new(Source) + sources, _ := source.GetAllSources() + if len(sources) < 1 { + panic("Error: No data source found") + } else if len(sources) > 1 { + panic("More than one data source! Exiting") + } + var dataSourceModel = new(Source) + omopDataSource := dataSourceModel.GetDataSource(sources[0].SourceId, Omop) + + if u.CheckIfDataDictionaryIsFilled(omopDataSource) { + var newDataDictionary DataDictionaryModel + var dataDictionaryEntries []*DataDictionaryResult + //Get total number of person ids + query := omopDataSource.Db.Table(omopDataSource.Schema + ".observation_continuous as observation" + omopDataSource.GetViewDirective()). + Select("count(distinct observation.person_id) as total, null as data") + + query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second) + defer cancel() + meta_result := query.Scan(&newDataDictionary) + + if meta_result.Error != nil { + log.Printf("ERROR: Failed to get number of person_ids") + return nil, errors.New("data dictionary is not available yet") + } else { + log.Printf("INFO: Total number of person_ids from observation view is %v.", newDataDictionary.Total) + } + + //get data dictionary entires saved in table + query = omopDataSource.Db.Table(omopDataSource.Schema + ".data_dictionary_result") + query, cancel = utils.AddSpecificTimeoutToQuery(query, 600*time.Second) + defer cancel() + meta_result = query.Scan(&dataDictionaryEntries) + + if meta_result.Error != nil { + log.Printf("ERROR: Failed to get data entries") + return nil, errors.New("data dictionary is not available yet") + } else { + log.Printf("INFO: Got data entries") + } + + newDataDictionary.Data, _ = json.Marshal(dataDictionaryEntries) + //set in cache + ResultCache = &newDataDictionary + return ResultCache, nil + } else { + return nil, errors.New("data dictionary is not available yet") + } } } // Generate Data Dictionary Json -func (u DataDictionary) GenerateDataDictionary() (*DataDictionaryModel, error) { - log.Printf("Generating Data Dictionary...") +func (u DataDictionary) GenerateDataDictionary() { conf := config.GetConfig() var catchAllCohortId = conf.GetInt("catch_all_cohort_id") log.Printf("catch all cohort id is %v", catchAllCohortId) + var maxWorkerSize int = conf.GetInt("worker_pool_size") + log.Printf("maxWorkerSize is %v", maxWorkerSize) + var batchSize int = conf.GetInt("batch_size") + log.Printf("Batch Size is %v", batchSize) + + entryCh := make(chan *DataDictionaryResult, maxWorkerSize) + var source = new(Source) sources, _ := source.GetAllSources() if len(sources) < 1 { @@ -68,63 +139,118 @@ func (u DataDictionary) GenerateDataDictionary() (*DataDictionaryModel, error) { var dataSourceModel = new(Source) omopDataSource := dataSourceModel.GetDataSource(sources[0].SourceId, Omop) - var dataDictionaryModel DataDictionaryModel - var dataDictionaryEntries []*DataDictionaryEntry - //see ddl_results_and_cdm.sql Data_Dictionary view - query := omopDataSource.Db.Table(omopDataSource.Schema + ".data_dictionary") - - query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second) - defer cancel() - meta_result := query.Scan(&dataDictionaryEntries) - if meta_result.Error != nil { - return nil, meta_result.Error - } else if len(dataDictionaryEntries) == 0 { - log.Printf("INFO: no data dictionary entry found") + if u.CheckIfDataDictionaryIsFilled(omopDataSource) { + log.Print("Data Dictionary Result already filled. Skipping generation.") + return } else { - log.Printf("INFO: Data dictionary entries found.") + var dataDictionaryEntries []*DataDictionaryEntry + //see ddl_results_and_cdm.sql Data_Dictionary view + query := omopDataSource.Db.Table(omopDataSource.Schema + ".data_dictionary") + + query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second) + defer cancel() + meta_result := query.Scan(&dataDictionaryEntries) + if meta_result.Error != nil { + log.Printf("Error: db read error: %v", meta_result.Error) + return + } else if len(dataDictionaryEntries) == 0 { + log.Printf("INFO: no data dictionary view entry found") + } else { + log.Printf("INFO: Data dictionary view entries found.") + } + + log.Printf("Get all histogram/bar graph data") + var partialDataList []*DataDictionaryEntry + var resultDataList []*DataDictionaryResult = []*DataDictionaryResult{} + for len(dataDictionaryEntries) > 0 { + wg := sync.WaitGroup{} + partialResultList := []*DataDictionaryResult{} + if len(dataDictionaryEntries) < maxWorkerSize { + partialDataList = dataDictionaryEntries + dataDictionaryEntries = []*DataDictionaryEntry{} + } else { + partialDataList = dataDictionaryEntries[:maxWorkerSize-1] + dataDictionaryEntries = dataDictionaryEntries[maxWorkerSize-1:] + } + + for _, d := range partialDataList { + wg.Add(1) + go GenerateData(d, sources[0].SourceId, catchAllCohortId, &wg, entryCh) + resultEntry := <-entryCh + partialResultList = append(partialResultList, resultEntry) + } + wg.Wait() + resultDataList = append(resultDataList, partialResultList...) + if len(resultDataList) >= batchSize { + log.Printf("%v row of results reached, flush to db.", batchSize) + u.WriteResultToDB(omopDataSource, resultDataList) + resultDataList = []*DataDictionaryResult{} + } + } + + if len(resultDataList) > 0 { + u.WriteResultToDB(omopDataSource, resultDataList) + } + + log.Printf("INFO: Data dictionary generation complete") + return } +} - //Get total number of concept ids - query = omopDataSource.Db.Table(omopDataSource.Schema + ".observation_continuous as observation" + omopDataSource.GetViewDirective()). - Select("count(distinct observation.person_id) as total, null as data") +func GenerateData(data *DataDictionaryEntry, sourceId int, catchAllCohortId int, wg *sync.WaitGroup, ch chan *DataDictionaryResult) { + var c = new(CohortData) - query, cancel = utils.AddSpecificTimeoutToQuery(query, 600*time.Second) - defer cancel() - meta_result = query.Scan(&dataDictionaryModel) + if data.ConceptClassId == "MVP Continuous" { + // MVP Continuous #similar to bin items below call cohort-middleware + var filterConceptIds = []int64{} + var filterCohortPairs = []utils.CustomDichotomousVariableDef{} + cohortData, _ := c.RetrieveHistogramDataBySourceIdAndCohortIdAndConceptIdsAndCohortPairs(sourceId, catchAllCohortId, data.ConceptID, filterConceptIds, filterCohortPairs) + conceptValues := []float64{} + for _, personData := range cohortData { + conceptValues = append(conceptValues, float64(*personData.ConceptValueAsNumber)) + } + log.Printf("INFO: concept id is %v", data.ConceptID) + histogramData := utils.GenerateHistogramData(conceptValues) - if meta_result.Error != nil { - log.Printf("ERROR: Failed to get number of concepts") + data.ValueSummary, _ = json.Marshal(histogramData) } else { - log.Printf("INFO: Got total number of concepts from observation view.") + log.Print("Get Ordinal Data") + //Get Value Summary from bar graph method + ordinalValueData, _ := c.RetrieveBarGraphDataBySourceIdAndCohortIdAndConceptIds(sourceId, catchAllCohortId, data.ConceptID) + + data.ValueSummary, _ = json.Marshal(ordinalValueData) } + result := DataDictionaryResult(*data) + //send result to channel + ch <- &result + wg.Done() +} - log.Printf("Get all histogram/bar graph data") +func (u DataDictionary) WriteResultToDB(dbSource *utils.DbAndSchema, resultDataList []*DataDictionaryResult) bool { + result := dbSource.Db.Create(resultDataList) + if result.Error != nil { + log.Printf("ERROR: Failed to insert data into table") + panic("") + } + log.Printf("Write to DB succeeded.") + return true +} - for _, data := range dataDictionaryEntries { - if data.ConceptClassId == "MVP Continuous" { - // MVP Continuous #similar to bin items below call cohort-middleware - var filterConceptIds = []int64{} - var filterCohortPairs = []utils.CustomDichotomousVariableDef{} - if u.CohortDataModel == nil { - u.CohortDataModel = new(CohortData) - } - cohortData, _ := u.CohortDataModel.RetrieveHistogramDataBySourceIdAndCohortIdAndConceptIdsAndCohortPairs(sources[0].SourceId, catchAllCohortId, data.ConceptID, filterConceptIds, filterCohortPairs) - conceptValues := []float64{} - for _, personData := range cohortData { - conceptValues = append(conceptValues, float64(*personData.ConceptValueAsNumber)) - } +func (u DataDictionary) CheckIfDataDictionaryIsFilled(dbSource *utils.DbAndSchema) bool { + var dataDictionaryResult []*DataDictionaryResult + query := dbSource.Db.Table(dbSource.Schema + ".data_dictionary_result") - histogramData := utils.GenerateHistogramData(conceptValues) - data.ValueSummary, _ = json.Marshal(histogramData) - } else { - //Get Value Summary from bar graph method - ordinalValueData, _ := u.CohortDataModel.RetrieveBarGraphDataBySourceIdAndCohortIdAndConceptIds(sources[0].SourceId, catchAllCohortId, data.ConceptID) - data.ValueSummary, _ = json.Marshal(ordinalValueData) - } + query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second) + defer cancel() + meta_result := query.Scan(&dataDictionaryResult) + if meta_result.Error != nil { + log.Printf("ERROR: Failed to get data dictionary result") + panic("") + } else if len(dataDictionaryResult) > 0 { + log.Printf("INFO: Data Dictionary Result Table is filled.") + return true + } else { + log.Printf("INFO: Data Dictionary Result Table is empty.") + return false } - - log.Printf("INFO: Data dictionary generation complete") - dataDictionaryModel.Data, _ = json.Marshal(dataDictionaryEntries) - dataDictionaryResult = &dataDictionaryModel - return &dataDictionaryModel, nil } diff --git a/server/router.go b/server/router.go index d1a6788..838c06d 100644 --- a/server/router.go +++ b/server/router.go @@ -59,6 +59,9 @@ func NewRouter() *gin.Engine { // Data Dictionary endpoint authorized.GET("/data-dictionary/Retrieve", cohortData.RetrieveDataDictionary) + // Data Dictionary endpoint + authorized.GET("/data-dictionary/Generate", cohortData.GenerateDataDictionary) + } return r diff --git a/tests/controllers_tests/controllers_test.go b/tests/controllers_tests/controllers_test.go index ad36553..87f3726 100644 --- a/tests/controllers_tests/controllers_test.go +++ b/tests/controllers_tests/controllers_test.go @@ -293,9 +293,7 @@ func (h dummyDataDictionaryModel) GetDataDictionary() (*models.DataDictionaryMod return data, nil } -func (h dummyDataDictionaryModel) GenerateDataDictionary() (*models.DataDictionaryModel, error) { - return nil, nil -} +func (h dummyDataDictionaryModel) GenerateDataDictionary() {} type dummyFailingDataDictionaryModel struct{} @@ -303,9 +301,8 @@ func (h dummyFailingDataDictionaryModel) GetDataDictionary() (*models.DataDictio return nil, errors.New("data dictionary is not available yet") } -func (h dummyFailingDataDictionaryModel) GenerateDataDictionary() (*models.DataDictionaryModel, error) { - return nil, nil -} +func (h dummyFailingDataDictionaryModel) GenerateDataDictionary() {} + func TestRetrieveHistogramForCohortIdAndConceptIdWithWrongParams(t *testing.T) { setUp(t) requestContext := new(gin.Context) @@ -1209,3 +1206,18 @@ func TestFailingRetrieveDataDictionary(t *testing.T) { } } + +func TestGenerateDataDictionary(t *testing.T) { + setUp(t) + requestContext := new(gin.Context) + requestContext.Writer = new(tests.CustomResponseWriter) + requestContext.Request = new(http.Request) + cohortDataController.GenerateDataDictionary(requestContext) + + result := requestContext.Writer.(*tests.CustomResponseWriter) + + if result.StatusCode != 200 { + t.Errorf("Expected request to succeed") + } + +} diff --git a/tests/models_tests/models_test.go b/tests/models_tests/models_test.go index dc9b8cf..5c74bc4 100644 --- a/tests/models_tests/models_test.go +++ b/tests/models_tests/models_test.go @@ -1045,23 +1045,55 @@ func TestPersonConceptAndCountString(t *testing.T) { } -func TestGenerateDataDictionary(t *testing.T) { +func TestGetDataDictionary(t *testing.T) { setUp(t) - dataDictionaryModel.CohortDataModel = cohortDataModel data, _ := dataDictionaryModel.GetDataDictionary() //Pre generation cache should be empty if data != nil { t.Errorf("Get Data Dictionary should have failed.") } +} + +func TestCheckIfDataDictionaryIsFilled(t *testing.T) { + setUp(t) + var source = new(models.Source) + sources, _ := source.GetAllSources() + var dataSourceModel = new(models.Source) + omopDataSource := dataSourceModel.GetDataSource(sources[0].SourceId, models.Omop) - generatedData, _ := dataDictionaryModel.GenerateDataDictionary() - if generatedData.Total != 18 { - t.Errorf("Data Dictionary Generation Failed.") + filled := dataDictionaryModel.CheckIfDataDictionaryIsFilled(omopDataSource) + if filled != false { + t.Errorf("Flag should be false") } + dataDictionaryModel.GenerateDataDictionary() + filled = dataDictionaryModel.CheckIfDataDictionaryIsFilled(omopDataSource) + if filled != true { + t.Errorf("Flag should be true") + } +} - data, _ = dataDictionaryModel.GetDataDictionary() - if data.Total != 18 { - t.Errorf("Data Dictionary Generation Failed.") +func TestGenerateDataDictionary(t *testing.T) { + setUp(t) + dataDictionaryModel.GenerateDataDictionary() + //Update this with read + data, _ := dataDictionaryModel.GetDataDictionary() + if data == nil || data.Total != 18 || data.Data == nil { + t.Errorf("Get Data Dictionary should have succeeded.") + } +} + +func TestWriteToDB(t *testing.T) { + setUp(t) + var source = new(models.Source) + sources, _ := source.GetAllSources() + var dataSourceModel = new(models.Source) + omopDataSource := dataSourceModel.GetDataSource(sources[0].SourceId, models.Omop) + + resultList := append([]*models.DataDictionaryResult{}, &models.DataDictionaryResult{ConceptID: 123}) + success := dataDictionaryModel.WriteResultToDB(omopDataSource, resultList) + //Write succeeded without panicking + if success != true { + t.Errorf("Write failed") } } diff --git a/tests/setup_local_db/ddl_results_and_cdm.sql b/tests/setup_local_db/ddl_results_and_cdm.sql index 91bbb15..2f992ab 100644 --- a/tests/setup_local_db/ddl_results_and_cdm.sql +++ b/tests/setup_local_db/ddl_results_and_cdm.sql @@ -95,6 +95,25 @@ CREATE TABLE omop.concept invalid_reason character varying(1) COLLATE pg_catalog."default" ); +CREATE TABLE omop.DATA_DICTIONARY_RESULT +( + vocabulary_id character varying(20), + concept_id integer not null, + concept_code character varying(50), + concept_name character varying(255), + concept_class_id character varying(20), + number_of_people_with_variable integer, + number_of_people_where_value_is_filled integer, + number_of_people_where_value_is_null integer, + value_stored_as character varying(20), + min_value numeric, + max_value numeric, + mean_value numeric, + standard_deviation numeric, + value_summary JSON --For sql server use varchar(max) +); +ALTER TABLE omop.DATA_DICTIONARY_RESULT ADD CONSTRAINT xpk_DATA_DICTIONARY_RESULT PRIMARY KEY ( concept_id ) ; + CREATE VIEW omop.OBSERVATION_CONTINUOUS AS SELECT ob.person_id, ob.observation_concept_id, ob.value_as_string, ob.value_as_number, ob.value_as_concept_id FROM omop.observation ob diff --git a/utils/histogram.go b/utils/histogram.go index f083fce..b40715f 100644 --- a/utils/histogram.go +++ b/utils/histogram.go @@ -14,12 +14,14 @@ type HistogramColumn struct { NumberOfPeople int `json:"personCount"` } +const MAX_NUM_BINS = 50 + func GenerateHistogramData(conceptValues []float64) []HistogramColumn { if len(conceptValues) == 0 { return nil } - numBins, width := GetBinsAndWidthUsingFreedmanDiaconisAndSortValues(conceptValues) //conceptValues will get sorted as a side-effect, which is useful in this case + numBins, width := GetBinsAndWidthAndSortValues(conceptValues) //conceptValues will get sorted as a side-effect, which is useful in this case startValue := conceptValues[0] binIndexToHistogramColumn := make(map[int]HistogramColumn) @@ -50,7 +52,7 @@ func GenerateHistogramData(conceptValues []float64) []HistogramColumn { } // Sorts the given values, and returns the number of bins, the width of the bins using FreedmanDiaconis -func GetBinsAndWidthUsingFreedmanDiaconisAndSortValues(values []float64) (int, float64) { +func GetBinsAndWidthAndSortValues(values []float64) (int, float64) { width := FreedmanDiaconis(values) // values will get sorted as a side-effect, which is useful in this case startValue := values[0] @@ -63,6 +65,12 @@ func GetBinsAndWidthUsingFreedmanDiaconisAndSortValues(values []float64) (int, f numBins = 1 width = endValue + 1 - startValue } + // check if numBins is acceptable: + if numBins > MAX_NUM_BINS { + log.Printf("%v number exceeded MAX_NUM_BINS. Use %v bins instead", numBins, MAX_NUM_BINS) + numBins = MAX_NUM_BINS + width = (endValue - startValue) / MAX_NUM_BINS + } log.Printf("num bins %v", numBins) return numBins, width