Skip to content

Commit

Permalink
Added ability to sync relationships only
Browse files Browse the repository at this point in the history
  • Loading branch information
coreybutler committed Oct 31, 2023
1 parent 14aa1f2 commit f54178b
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 117 deletions.
19 changes: 19 additions & 0 deletions archive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ func (a *Archive) HasDoc() bool {
return true
}

func (a *Archive) AddSchema(name string) {
_, err := a.doc.GetSchema(name)
if err != nil {
a.doc.ApplySchema(&doc.Schema{
Name: doc.Name{
Physical: name,
},
Relationships: make(map[string]*doc.Relationship),
Sets: make(map[string]*doc.Set),
})
}
}

func (a *Archive) Query(statement string) (*RecordSet, error) {
conn, err := sql.Open("sqlite3", a.path)
if err != nil {
Expand Down Expand Up @@ -1050,6 +1063,12 @@ func (a *Archive) getSet(record map[string]interface{}) (*doc.Set, error) {
util.Dump(record)
}

if record["schema"] == nil {
schemas := a.doc.GetSchemas()
if len(schemas) > 0 {
record["schema"] = schemas[0].Name.Physical
}
}
schema, err := a.doc.GetSchema(record["schema"].(string))
if err != nil {
return &doc.Set{}, err
Expand Down
269 changes: 170 additions & 99 deletions command/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Extractor struct {
System string `name:"system" short:"j" help:"The system/job ID where status messages are logged." json:"datahub_job_id"`
APIKey string `name:"api_key" short:"k" help:"Optional API key to access the Datahub" json:"api_key"`
Debug bool `name:"debug" short:"d" help:"Turn on debugging"`
RelsOnly bool `name:"onlyrelationships" short:"r" help:"Only sync relationships"`
ConnectionString string `arg:"conn" optional:"" help:"The source connection string used to extract metadata from the data store" json:"db_connection_string"`
}

Expand Down Expand Up @@ -79,7 +80,13 @@ func (e *Extractor) Run(ctx *Context) error {
if e.Debug {
fmt.Println(" begin extraction...")
}
doc, err := remote.Extract()

elements := []string{}
if e.RelsOnly {
elements = append(elements, "relationships")
}

doc, err := remote.Extract(elements...)
if err != nil {
fmt.Println(err)
return err
Expand All @@ -93,7 +100,7 @@ func (e *Extractor) Run(ctx *Context) error {
end_extract := time.Since(start_extract)
fmt.Printf("Source Extraction: %s\n", end_extract)

if len(e.Expand) > 0 {
if len(e.Expand) > 0 && (util.InSlice[string]("views", elements) || util.InSlice[string]("entities", elements)) {
if e.Debug {
fmt.Println(" enabling JSON field expansion functions...")
}
Expand All @@ -103,47 +110,57 @@ func (e *Extractor) Run(ctx *Context) error {
fmt.Printf("JSON Expansion: %s\n", end_expand)
}

switch strings.ToLower(filepath.Ext(e.Outfile)) {
// JSON
case ".json":
start_json := time.Now()
if e.Debug {
fmt.Println(" writing metadoc to JSON file...")
if len(elements) > 0 {
switch strings.ToLower(filepath.Ext(e.Outfile)) {
// JSON
case ".json":
start_json := time.Now()
if e.Debug {
fmt.Println(" writing metadoc to JSON file...")
}
util.DumpFile(e.Outfile, doc.ToJSON())
end_json = time.Since(start_json)
fmt.Printf("Created %s in %s\n", e.Outfile, end_json)
// case ".xml":
}
util.DumpFile(e.Outfile, doc.ToJSON())
end_json = time.Since(start_json)
fmt.Printf("Created %s in %s\n", e.Outfile, end_json)
// case ".xml":
}

start_sqlite := time.Now()
if e.Debug {
fmt.Println(" extracting data set metadata from source...")
}
sets := extractor.GetAllSets(doc)
fmt.Printf(" stashing %v set(s)...\n", len(sets))
err = cache.UpsertSets("source", sets)
if err != nil {
fmt.Println(err)
}
if e.Debug {
fmt.Println(" extracting data item metadata from source...")
}
items := extractor.GetAllItems(doc)
fmt.Printf(" stashing %v item(s)...\n", len(items))
err = cache.UpsertItems("source", items)
if err != nil {
fmt.Println(err)
}
if e.Debug {
fmt.Println(" extracting data relationship metadata from source...")

if util.InSlice[string]("enities", elements) {
if e.Debug {
fmt.Println(" extracting data set metadata from source...")
}
sets := extractor.GetAllSets(doc)
fmt.Printf(" stashing %v set(s)...\n", len(sets))
err = cache.UpsertSets("source", sets)
if err != nil {
fmt.Println(err)
}

if e.Debug {
fmt.Println(" extracting data item metadata from source...")
}
items := extractor.GetAllItems(doc)
fmt.Printf(" stashing %v item(s)...\n", len(items))
err = cache.UpsertItems("source", items)
if err != nil {
fmt.Println(err)
}
}
rels := extractor.GetAllRelationships(doc)
fmt.Printf(" stashing %v relationship(s)...\n", len(rels))
err = cache.UpsertRelationships("source", rels)
if err != nil {
fmt.Println(err)

if util.InSlice[string]("relationships", elements) {
if e.Debug {
fmt.Println(" extracting data relationship metadata from source...")
}
rels := extractor.GetAllRelationships(doc)
fmt.Printf(" stashing %v relationship(s)...\n", len(rels))
err = cache.UpsertRelationships("source", rels)
if err != nil {
fmt.Println(err)
}
}

end_sqlite = time.Since(start_sqlite)
fmt.Printf("Created cache in %s\n", end_sqlite)
// }
Expand All @@ -155,85 +172,141 @@ func (e *Extractor) Run(ctx *Context) error {
dh, err := datahub.New(e.DatahubURL, e.Source, cache, e.APIKey)

if err == nil {
if e.Debug {
fmt.Println(" populating datahub sources...")
}
err = dh.PopulateSources()
if err != nil {
fmt.Println(err)
if len(elements) == 1 && elements[0] == "relationships" {
for _, schema := range e.Schemas {
dh.GetDoc().ApplySchemaByName(schema)
diff := archive.CreateDiff()

err = dh.PopulateRelationships(diff)
if err == nil {
rels := extractor.GetAllRelationships(dh.GetDoc())
fmt.Printf(" stashing %v relationship(s)...\n", len(rels))

err = cache.UpsertRelationships("datahub", rels)
if err == nil {
if e.Debug {
fmt.Println(" diffing data relationships...")
}
reldiff, err := cache.DiffRelationships(diff)
if err == nil {
if e.Debug {
fmt.Println(" diffing individual relationship joins...")
}
joindiff, err := cache.DiffJoins(diff, reldiff)
if err == nil {
fmt.Printf("\nNow syncing with the Datahub...\n")
if e.DryRun {
if e.Debug {
fmt.Println(" running dry run...")
}
dh.DryRun(reldiff, e.Max, "relationship")
fmt.Println("")
dh.DryRun(joindiff, e.Max, "join")
} else {
if e.Debug {
fmt.Println(" syncing...")
}
dh.DryRun(reldiff, e.Max, "relationship")
dh.Commit(reldiff)
cache.ResetDatahub()
cache.ResetDatasource()
}
} else {
fmt.Println(err)
}
} else {
fmt.Println(err)
}
} else {
fmt.Println(err)
}
} else {
fmt.Println(err)
}
}
} else {
sets := extractor.GetAllSets(dh.GetDoc())
fmt.Printf(" stashing %v set(s)...\n", len(sets))
err := cache.UpsertSets("datahub", sets)
if e.Debug {
fmt.Println(" populating datahub sources...")
}
err = dh.PopulateSources()
if err != nil {
fmt.Println(err)
} else {
if e.Debug {
fmt.Println(" diffing sets...")
}
diff, err := cache.DiffSets()

if err == nil {
sets := extractor.GetAllSets(dh.GetDoc())
fmt.Printf(" stashing %v set(s)...\n", len(sets))
err := cache.UpsertSets("datahub", sets)
if err != nil {
fmt.Println(err)
} else {
if e.Debug {
fmt.Println(" populating data items...")
fmt.Println(" diffing sets...")
}
err = dh.PopulateItems(diff)
diff, err := cache.DiffSets()

if err == nil {
items := extractor.GetAllItems(dh.GetDoc())
fmt.Printf(" stashing %v item(s)...\n", len(items))
err := cache.UpsertItems("datahub", items)
if e.Debug {
fmt.Println(" populating data items...")
}
err = dh.PopulateItems(diff)
if err == nil {
if e.Debug {
fmt.Println(" diffing data items...")
}
itemdiff, err := cache.DiffItems(diff)
items := extractor.GetAllItems(dh.GetDoc())
fmt.Printf(" stashing %v item(s)...\n", len(items))
err := cache.UpsertItems("datahub", items)
if err == nil {
if e.Debug {
fmt.Println(" populating datahub relationships...")
fmt.Println(" diffing data items...")
}
err = dh.PopulateRelationships(diff)
itemdiff, err := cache.DiffItems(diff)
if err == nil {
rels := extractor.GetAllRelationships(dh.GetDoc())
fmt.Printf(" stashing %v relationship(s)...\n", len(rels))

err = cache.UpsertRelationships("datahub", rels)
if e.Debug {
fmt.Println(" populating datahub relationships...")
}
err = dh.PopulateRelationships(diff)
if err == nil {
if e.Debug {
fmt.Println(" diffing data relationships...")
}
reldiff, err := cache.DiffRelationships(diff)
rels := extractor.GetAllRelationships(dh.GetDoc())
fmt.Printf(" stashing %v relationship(s)...\n", len(rels))

err = cache.UpsertRelationships("datahub", rels)
if err == nil {
if e.Debug {
fmt.Println(" diffing individual relationship joins...")
fmt.Println(" diffing data relationships...")
}
joindiff, err := cache.DiffJoins(diff, reldiff)
reldiff, err := cache.DiffRelationships(diff)
if err == nil {
fmt.Printf("\nNow syncing with the Datahub...\n")
if e.DryRun {
if e.Debug {
fmt.Println(" running dry run...")
if e.Debug {
fmt.Println(" diffing individual relationship joins...")
}
joindiff, err := cache.DiffJoins(diff, reldiff)
if err == nil {
fmt.Printf("\nNow syncing with the Datahub...\n")
if e.DryRun {
if e.Debug {
fmt.Println(" running dry run...")
}
dh.DryRun(diff, e.Max)
fmt.Println("")
dh.DryRun(itemdiff, e.Max, "item")
fmt.Println("")
dh.DryRun(reldiff, e.Max, "relationship")
fmt.Println("")
dh.DryRun(joindiff, e.Max, "join")
} else {
if e.Debug {
fmt.Println(" syncing...")
}
dh.DryRun(diff, e.Max)
dh.Commit(diff)
fmt.Println("")
dh.DryRun(itemdiff, e.Max, "item")
dh.Commit(itemdiff)
fmt.Println("")
dh.DryRun(reldiff, e.Max, "relationship")
dh.Commit(reldiff)
cache.ResetDatahub()
cache.ResetDatasource()
}
dh.DryRun(diff, e.Max)
fmt.Println("")
dh.DryRun(itemdiff, e.Max, "item")
fmt.Println("")
dh.DryRun(reldiff, e.Max, "relationship")
fmt.Println("")
dh.DryRun(joindiff, e.Max, "join")
} else {
if e.Debug {
fmt.Println(" syncing...")
}
dh.DryRun(diff, e.Max)
dh.Commit(diff)
fmt.Println("")
dh.DryRun(itemdiff, e.Max, "item")
dh.Commit(itemdiff)
fmt.Println("")
dh.DryRun(reldiff, e.Max, "relationship")
dh.Commit(reldiff)
cache.ResetDatahub()
cache.ResetDatasource()
fmt.Println(err)
}
} else {
fmt.Println(err)
Expand All @@ -256,8 +329,6 @@ func (e *Extractor) Run(ctx *Context) error {
} else {
fmt.Println(err)
}
} else {
fmt.Println(err)
}
}
}
Expand Down
Binary file modified datahub-sync.db
Binary file not shown.
8 changes: 8 additions & 0 deletions extractor/doc/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func (d *Doc) ApplySchema(schema *Schema) *Schema {
return currentSchema.Merge(schema)
}

func (d *Doc) ApplySchemaByName(schema string) *Schema {
s := &Schema{
Name: Name{Physical: schema},
}

return d.ApplySchema(s)
}

func (d *Doc) GetSchema(name string) (*Schema, error) {
id := strings.ToLower(name)
if schema, exists := d.schemas[id]; exists {
Expand Down
2 changes: 1 addition & 1 deletion extractor/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "dhs/extractor/doc"

type Extractor interface {
SetConnectionString(str string) error
Extract() (*doc.Doc, error)
Extract(...string) (*doc.Doc, error)
Type() string
// Query(statement string) ([]map[string]interface{}, error)
ExpandJSONFields(*doc.Doc, bool, ...string)
Expand Down
Loading

0 comments on commit f54178b

Please sign in to comment.