Skip to content

Commit

Permalink
VADC-1084 (#100)
Browse files Browse the repository at this point in the history
* use go routine to generate histogram for data dictionary, change get to read from table

* disable generation on startup

* add naive binning fallback (#101)

Co-authored-by: pieterlukasse <[email protected]>
  • Loading branch information
tianj7 and pieterlukasse authored May 3, 2024
1 parent 6977553 commit e6cfa7f
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 86 deletions.
2 changes: 2 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ validate:
# HARE concept id:
- '2000007027'
catch_all_cohort_id: 4
worker_pool_size: 5
batch_size: 50
6 changes: 6 additions & 0 deletions controllers/cohortdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,9 @@ func (u CohortDataController) RetrieveDataDictionary(c *gin.Context) {
}

}

func (u CohortDataController) GenerateDataDictionary(c *gin.Context) {
log.Printf("Generating Data Dictionary...")
go u.dataDictionaryModel.GenerateDataDictionary()
c.JSON(http.StatusOK, "Data Dictionary Kicked Off")
}
12 changes: 0 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,11 @@ func runDataValidation() {
}
}

func runDataDictionaryGeneration() {
var cohortDataModel = new(models.CohortData)
var dataDictionaryModel = new(models.DataDictionary)
dataDictionaryModel.CohortDataModel = cohortDataModel
log.Printf("Generating Data Dictionary...")
_, error := dataDictionaryModel.GenerateDataDictionary()
if error != nil {
log.Printf("Error: Data Dictionary Generation Failed! Gorm error %v", error)
}
}

func main() {
environment := flag.String("e", "development", "Environment/prefix of config file name")
flag.Parse()
config.Init(*environment)
db.Init()
runDataValidation()
go runDataDictionaryGeneration()
server.Init()
}
3 changes: 1 addition & 2 deletions models/cohortdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ func (h CohortData) RetrieveDataBySourceIdAndCohortIdAndConceptIdsOrderedByPerso
func (h CohortData) RetrieveHistogramDataBySourceIdAndCohortIdAndConceptIdsAndCohortPairs(sourceId int, cohortDefinitionId int, histogramConceptId int64, filterConceptIds []int64, filterCohortPairs []utils.CustomDichotomousVariableDef) ([]*PersonConceptAndValue, error) {
var dataSourceModel = new(Source)
omopDataSource := dataSourceModel.GetDataSource(sourceId, Omop)

resultsDataSource := dataSourceModel.GetDataSource(sourceId, Results)
log.Printf("Got All data sources")

// get the observations for the subjects and the concepts, to build up the data rows to return:
var cohortData []*PersonConceptAndValue
query := QueryFilterByCohortPairsHelper(filterCohortPairs, resultsDataSource, cohortDefinitionId, "unionAndIntersect").
Expand Down
238 changes: 182 additions & 56 deletions models/datadictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"encoding/json"
"errors"
"log"
"sync"
"time"

"github.com/uc-cdis/cohort-middleware/config"
"github.com/uc-cdis/cohort-middleware/utils"
)

type DataDictionaryI interface {
GenerateDataDictionary() (*DataDictionaryModel, error)
GenerateDataDictionary()
GetDataDictionary() (*DataDictionaryModel, error)
}

type DataDictionary struct {
CohortDataModel CohortDataI
}

type DataDictionaryModel struct {
Expand All @@ -41,23 +41,94 @@ type DataDictionaryEntry struct {
ValueSummary json.RawMessage `json:"valueSummary"`
}

var dataDictionaryResult *DataDictionaryModel = nil
type DataDictionaryResult struct {
VocabularyID string `json:"vocabularyID"`
ConceptID int64 `json:"conceptID"`
ConceptCode string `json:"conceptCode"`
ConceptName string `json:"conceptName"`
ConceptClassId string `json:"conceptClassID"`
NumberOfPeopleWithVariable int64 `json:"numberOfPeopleWithVariable"`
NumberOfPeopleWhereValueIsFilled int64 `json:"numberOfPeopleWhereValueIsFilled"`
NumberOfPeopleWhereValueIsNull int64 `json:"numberOfPeopleWhereValueIsNull"`
ValueStoredAs string `json:"valueStoredAs"`
MinValue float64 `json:"minValue"`
MaxValue float64 `json:"maxValue"`
MeanValue float64 `json:"meanValue"`
StandardDeviation float64 `json:"standardDeviation"`
ValueSummary json.RawMessage `json:"valueSummary"`
}

var ResultCache *DataDictionaryModel = nil

func (u DataDictionary) GetDataDictionary() (*DataDictionaryModel, error) {
//Read from cache
if dataDictionaryResult != nil {
return dataDictionaryResult, nil
if ResultCache != nil {
return ResultCache, nil
} else {
return nil, errors.New("data dictionary is not available yet")
//Read from DB
var source = new(Source)
sources, _ := source.GetAllSources()
if len(sources) < 1 {
panic("Error: No data source found")
} else if len(sources) > 1 {
panic("More than one data source! Exiting")
}
var dataSourceModel = new(Source)
omopDataSource := dataSourceModel.GetDataSource(sources[0].SourceId, Omop)

if u.CheckIfDataDictionaryIsFilled(omopDataSource) {
var newDataDictionary DataDictionaryModel
var dataDictionaryEntries []*DataDictionaryResult
//Get total number of person ids
query := omopDataSource.Db.Table(omopDataSource.Schema + ".observation_continuous as observation" + omopDataSource.GetViewDirective()).
Select("count(distinct observation.person_id) as total, null as data")

query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second)
defer cancel()
meta_result := query.Scan(&newDataDictionary)

if meta_result.Error != nil {
log.Printf("ERROR: Failed to get number of person_ids")
return nil, errors.New("data dictionary is not available yet")
} else {
log.Printf("INFO: Total number of person_ids from observation view is %v.", newDataDictionary.Total)
}

//get data dictionary entires saved in table
query = omopDataSource.Db.Table(omopDataSource.Schema + ".data_dictionary_result")
query, cancel = utils.AddSpecificTimeoutToQuery(query, 600*time.Second)
defer cancel()
meta_result = query.Scan(&dataDictionaryEntries)

if meta_result.Error != nil {
log.Printf("ERROR: Failed to get data entries")
return nil, errors.New("data dictionary is not available yet")
} else {
log.Printf("INFO: Got data entries")
}

newDataDictionary.Data, _ = json.Marshal(dataDictionaryEntries)
//set in cache
ResultCache = &newDataDictionary
return ResultCache, nil
} else {
return nil, errors.New("data dictionary is not available yet")
}
}
}

// Generate Data Dictionary Json
func (u DataDictionary) GenerateDataDictionary() (*DataDictionaryModel, error) {
log.Printf("Generating Data Dictionary...")
func (u DataDictionary) GenerateDataDictionary() {
conf := config.GetConfig()
var catchAllCohortId = conf.GetInt("catch_all_cohort_id")
log.Printf("catch all cohort id is %v", catchAllCohortId)
var maxWorkerSize int = conf.GetInt("worker_pool_size")
log.Printf("maxWorkerSize is %v", maxWorkerSize)
var batchSize int = conf.GetInt("batch_size")
log.Printf("Batch Size is %v", batchSize)

entryCh := make(chan *DataDictionaryResult, maxWorkerSize)

var source = new(Source)
sources, _ := source.GetAllSources()
if len(sources) < 1 {
Expand All @@ -68,63 +139,118 @@ func (u DataDictionary) GenerateDataDictionary() (*DataDictionaryModel, error) {
var dataSourceModel = new(Source)
omopDataSource := dataSourceModel.GetDataSource(sources[0].SourceId, Omop)

var dataDictionaryModel DataDictionaryModel
var dataDictionaryEntries []*DataDictionaryEntry
//see ddl_results_and_cdm.sql Data_Dictionary view
query := omopDataSource.Db.Table(omopDataSource.Schema + ".data_dictionary")

query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second)
defer cancel()
meta_result := query.Scan(&dataDictionaryEntries)
if meta_result.Error != nil {
return nil, meta_result.Error
} else if len(dataDictionaryEntries) == 0 {
log.Printf("INFO: no data dictionary entry found")
if u.CheckIfDataDictionaryIsFilled(omopDataSource) {
log.Print("Data Dictionary Result already filled. Skipping generation.")
return
} else {
log.Printf("INFO: Data dictionary entries found.")
var dataDictionaryEntries []*DataDictionaryEntry
//see ddl_results_and_cdm.sql Data_Dictionary view
query := omopDataSource.Db.Table(omopDataSource.Schema + ".data_dictionary")

query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second)
defer cancel()
meta_result := query.Scan(&dataDictionaryEntries)
if meta_result.Error != nil {
log.Printf("Error: db read error: %v", meta_result.Error)
return
} else if len(dataDictionaryEntries) == 0 {
log.Printf("INFO: no data dictionary view entry found")
} else {
log.Printf("INFO: Data dictionary view entries found.")
}

log.Printf("Get all histogram/bar graph data")
var partialDataList []*DataDictionaryEntry
var resultDataList []*DataDictionaryResult = []*DataDictionaryResult{}
for len(dataDictionaryEntries) > 0 {
wg := sync.WaitGroup{}
partialResultList := []*DataDictionaryResult{}
if len(dataDictionaryEntries) < maxWorkerSize {
partialDataList = dataDictionaryEntries
dataDictionaryEntries = []*DataDictionaryEntry{}
} else {
partialDataList = dataDictionaryEntries[:maxWorkerSize-1]
dataDictionaryEntries = dataDictionaryEntries[maxWorkerSize-1:]
}

for _, d := range partialDataList {
wg.Add(1)
go GenerateData(d, sources[0].SourceId, catchAllCohortId, &wg, entryCh)
resultEntry := <-entryCh
partialResultList = append(partialResultList, resultEntry)
}
wg.Wait()
resultDataList = append(resultDataList, partialResultList...)
if len(resultDataList) >= batchSize {
log.Printf("%v row of results reached, flush to db.", batchSize)
u.WriteResultToDB(omopDataSource, resultDataList)
resultDataList = []*DataDictionaryResult{}
}
}

if len(resultDataList) > 0 {
u.WriteResultToDB(omopDataSource, resultDataList)
}

log.Printf("INFO: Data dictionary generation complete")
return
}
}

//Get total number of concept ids
query = omopDataSource.Db.Table(omopDataSource.Schema + ".observation_continuous as observation" + omopDataSource.GetViewDirective()).
Select("count(distinct observation.person_id) as total, null as data")
func GenerateData(data *DataDictionaryEntry, sourceId int, catchAllCohortId int, wg *sync.WaitGroup, ch chan *DataDictionaryResult) {
var c = new(CohortData)

query, cancel = utils.AddSpecificTimeoutToQuery(query, 600*time.Second)
defer cancel()
meta_result = query.Scan(&dataDictionaryModel)
if data.ConceptClassId == "MVP Continuous" {
// MVP Continuous #similar to bin items below call cohort-middleware
var filterConceptIds = []int64{}
var filterCohortPairs = []utils.CustomDichotomousVariableDef{}
cohortData, _ := c.RetrieveHistogramDataBySourceIdAndCohortIdAndConceptIdsAndCohortPairs(sourceId, catchAllCohortId, data.ConceptID, filterConceptIds, filterCohortPairs)
conceptValues := []float64{}
for _, personData := range cohortData {
conceptValues = append(conceptValues, float64(*personData.ConceptValueAsNumber))
}
log.Printf("INFO: concept id is %v", data.ConceptID)
histogramData := utils.GenerateHistogramData(conceptValues)

if meta_result.Error != nil {
log.Printf("ERROR: Failed to get number of concepts")
data.ValueSummary, _ = json.Marshal(histogramData)
} else {
log.Printf("INFO: Got total number of concepts from observation view.")
log.Print("Get Ordinal Data")
//Get Value Summary from bar graph method
ordinalValueData, _ := c.RetrieveBarGraphDataBySourceIdAndCohortIdAndConceptIds(sourceId, catchAllCohortId, data.ConceptID)

data.ValueSummary, _ = json.Marshal(ordinalValueData)
}
result := DataDictionaryResult(*data)
//send result to channel
ch <- &result
wg.Done()
}

log.Printf("Get all histogram/bar graph data")
func (u DataDictionary) WriteResultToDB(dbSource *utils.DbAndSchema, resultDataList []*DataDictionaryResult) bool {
result := dbSource.Db.Create(resultDataList)
if result.Error != nil {
log.Printf("ERROR: Failed to insert data into table")
panic("")
}
log.Printf("Write to DB succeeded.")
return true
}

for _, data := range dataDictionaryEntries {
if data.ConceptClassId == "MVP Continuous" {
// MVP Continuous #similar to bin items below call cohort-middleware
var filterConceptIds = []int64{}
var filterCohortPairs = []utils.CustomDichotomousVariableDef{}
if u.CohortDataModel == nil {
u.CohortDataModel = new(CohortData)
}
cohortData, _ := u.CohortDataModel.RetrieveHistogramDataBySourceIdAndCohortIdAndConceptIdsAndCohortPairs(sources[0].SourceId, catchAllCohortId, data.ConceptID, filterConceptIds, filterCohortPairs)
conceptValues := []float64{}
for _, personData := range cohortData {
conceptValues = append(conceptValues, float64(*personData.ConceptValueAsNumber))
}
func (u DataDictionary) CheckIfDataDictionaryIsFilled(dbSource *utils.DbAndSchema) bool {
var dataDictionaryResult []*DataDictionaryResult
query := dbSource.Db.Table(dbSource.Schema + ".data_dictionary_result")

histogramData := utils.GenerateHistogramData(conceptValues)
data.ValueSummary, _ = json.Marshal(histogramData)
} else {
//Get Value Summary from bar graph method
ordinalValueData, _ := u.CohortDataModel.RetrieveBarGraphDataBySourceIdAndCohortIdAndConceptIds(sources[0].SourceId, catchAllCohortId, data.ConceptID)
data.ValueSummary, _ = json.Marshal(ordinalValueData)
}
query, cancel := utils.AddSpecificTimeoutToQuery(query, 600*time.Second)
defer cancel()
meta_result := query.Scan(&dataDictionaryResult)
if meta_result.Error != nil {
log.Printf("ERROR: Failed to get data dictionary result")
panic("")
} else if len(dataDictionaryResult) > 0 {
log.Printf("INFO: Data Dictionary Result Table is filled.")
return true
} else {
log.Printf("INFO: Data Dictionary Result Table is empty.")
return false
}

log.Printf("INFO: Data dictionary generation complete")
dataDictionaryModel.Data, _ = json.Marshal(dataDictionaryEntries)
dataDictionaryResult = &dataDictionaryModel
return &dataDictionaryModel, nil
}
3 changes: 3 additions & 0 deletions server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func NewRouter() *gin.Engine {
// Data Dictionary endpoint
authorized.GET("/data-dictionary/Retrieve", cohortData.RetrieveDataDictionary)

// Data Dictionary endpoint
authorized.GET("/data-dictionary/Generate", cohortData.GenerateDataDictionary)

}

return r
Expand Down
24 changes: 18 additions & 6 deletions tests/controllers_tests/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,19 +293,16 @@ func (h dummyDataDictionaryModel) GetDataDictionary() (*models.DataDictionaryMod
return data, nil
}

func (h dummyDataDictionaryModel) GenerateDataDictionary() (*models.DataDictionaryModel, error) {
return nil, nil
}
func (h dummyDataDictionaryModel) GenerateDataDictionary() {}

type dummyFailingDataDictionaryModel struct{}

func (h dummyFailingDataDictionaryModel) GetDataDictionary() (*models.DataDictionaryModel, error) {
return nil, errors.New("data dictionary is not available yet")
}

func (h dummyFailingDataDictionaryModel) GenerateDataDictionary() (*models.DataDictionaryModel, error) {
return nil, nil
}
func (h dummyFailingDataDictionaryModel) GenerateDataDictionary() {}

func TestRetrieveHistogramForCohortIdAndConceptIdWithWrongParams(t *testing.T) {
setUp(t)
requestContext := new(gin.Context)
Expand Down Expand Up @@ -1209,3 +1206,18 @@ func TestFailingRetrieveDataDictionary(t *testing.T) {
}

}

func TestGenerateDataDictionary(t *testing.T) {
setUp(t)
requestContext := new(gin.Context)
requestContext.Writer = new(tests.CustomResponseWriter)
requestContext.Request = new(http.Request)
cohortDataController.GenerateDataDictionary(requestContext)

result := requestContext.Writer.(*tests.CustomResponseWriter)

if result.StatusCode != 200 {
t.Errorf("Expected request to succeed")
}

}
Loading

0 comments on commit e6cfa7f

Please sign in to comment.