Skip to content
This repository has been archived by the owner on Dec 22, 2020. It is now read-only.

Commit

Permalink
allow subtables of composite key tables
Browse files Browse the repository at this point in the history
  • Loading branch information
awreece committed Nov 15, 2017
1 parent 2d7950d commit 9e10bfb
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 50 deletions.
111 changes: 62 additions & 49 deletions lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ def check_columns!(ns, spec)
end
end

def parse_spec(ns, spec, source=[])
def parent_scope_column(parent, colname)
(parent.to_s.singularize + "_" + colname.to_s).to_sym
end

def parse_spec(ns, spec, source=[], parent_pks=[])
out = spec.dup
out[:columns] = to_array(spec.delete(:columns))
meta = spec.delete(:meta)
pks = parent_pks + primary_sql_keys_for_schema(out).map { |k| parent_scope_column(meta[:table], k) }

out[:subtables] = spec.map do |name, subspec|
newsource = source + [name]
subspec = parse_spec(ns , subspec, newsource)
subspec = parse_spec(ns , subspec, newsource, pks)
subspec[:meta][:source] = newsource
subspec[:meta][:parent_fkey] = (meta[:table].to_s.singularize + "_id").to_sym
subspec[:meta][:parent_fkeys] = pks
subspec
end
check_columns!(ns, out)
Expand Down Expand Up @@ -103,12 +109,11 @@ def qualified_table_name(meta)
end
end

def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil)
def create_table(db, spec, clobber, parent_table=nil, parent_pks={})
meta = spec[:meta]
table_name = qualified_table_name(meta)
composite_key = meta[:composite_key]
keys = []
keytypes = []
primary_keys = {}
log.info("Creating table #{db.literal(table_name)}...")
db.drop_table?(table_name, :cascade => true) if clobber
db.create_table(table_name) do
Expand All @@ -129,11 +134,9 @@ def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil)
column col[:name], col[:type], opts

if composite_key and composite_key.include?(col[:name])
keys << col[:name].to_sym
keytypes << col[:type]
primary_keys[col[:name].to_sym] = col[:type]
elsif not composite_key and col[:source].to_sym == :_id
keys << col[:name].to_sym
keytypes << col[:type]
primary_keys[col[:name].to_sym] = col[:type]
end
end

Expand All @@ -151,20 +154,20 @@ def create_table(db, spec, clobber, parent_table=nil, parent_pk_type = nil)
end

if !parent_table.nil?
foreign_key meta[:parent_fkey], parent_table, {
:type => parent_pk_type,
parent_pks.each do |k, type|
column k, type
end
foreign_key parent_pks.keys, parent_table, {
:on_delete => :cascade,
:on_update => :cascade
}
keys << meta[:parent_fkey]
keytypes << parent_pk_type
end
primary_key keys
primary_key primary_keys.keys + parent_pks.keys
end

parent_pks = Hash[primary_keys.map { |k, t| [parent_scope_column(meta[:table], k), t] }].merge(parent_pks)
spec[:subtables].each do |subspec|
raise "Too many keys for sub table in #{table_name}: #{keys}" unless keys.length == 1
create_table(db, subspec, clobber, table_name, keytypes.first)
create_table(db, subspec, clobber, table_name, parent_pks)
end
end

Expand Down Expand Up @@ -276,17 +279,41 @@ def transform_primitive(v, type)
end
end

def transform_one(schema, obj)
def transform_value(col, v)
case v
when Hash
JSON.dump(v)
when Array
if col[:array_type]
v = v.map { |it| transform_primitive(it, col[:array_type]) }
Sequel.pg_array(v, col[:array_type])
else
JSON.dump(v)
end
else
transform_primitive(v, col[:type])
end
end

def get_pks_for_debug(schema, obj, parent_pks={})
pks = parent_pks.clone
sql_pks = primary_sql_keys_for_schema(schema)
schema[:columns].each do |col|
break unless sql_pks.include?(col[:name])

pks[col[:name]] = bson_dig_dotted(obj, col[:source])
end
pks
end

def transform_one(schema, obj, parent_pks={})
original = obj

# Do a deep clone, because we're potentially going to be
# mutating embedded objects.
obj = BSON.deserialize(BSON.serialize(obj))

row = {}
sql_pks = primary_sql_keys_for_schema(schema)
pk_cols = schema[:columns].select{ |c| sql_pks.include?(c[:name]) }
pks = Hash[pk_cols.map { |c| [c[:name], bson_dig_dotted(obj, c[:source])] }]
row = parent_pks.clone
schema[:columns].each do |col|
source = col[:source]
type = col[:type]
Expand All @@ -295,35 +322,22 @@ def transform_one(schema, obj)
if source.start_with?("$")
v = fetch_special_source(obj, source, original)
else
v = fetch_and_delete_dotted(obj, source)
case v
when Hash
v = JSON.dump(v)
when Array
if col[:array_type]
v = v.map { |it| transform_primitive(it, col[:array_type]) }
v = Sequel.pg_array(v, col[:array_type])
else
v = JSON.dump(v)
end
else
v = transform_primitive(v, type)
end
v = transform_value(col, fetch_and_delete_dotted(obj, source))
end

null_allowed = !col[:notnull] or col.has_key?(:default)
if v.nil? and not null_allowed
raise "Invalid null #{source.inspect} for #{pks.inspect}"
raise "Invalid null #{source.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}"
elsif v.is_a? Sequel::SQL::Blob and type != "bytea"
raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{pks.inspect}"
elsif col[:array_type]
raise "Failed to convert binary #{source.inspect} to #{type.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}"
elsif col[:array_type] and not v.nil?
v.each_with_index do |e, i|
if not sanity_check_type(e, col[:array_type])
raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{pks.inspect}"
raise "Failed to convert array element #{i} of #{source.inspect} to #{type.inspect}: got #{e.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}"
end
end
elsif not sanity_check_type(v, type)
raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{pks.inspect}"
elsif not v.nil? and not sanity_check_type(v, type)
raise "Failed to convert #{source.inspect} to #{type.inspect}: got #{v.inspect} for #{get_pks_for_debug(schema, obj, parent_pks)}"
end
row[name] = v
end
Expand Down Expand Up @@ -380,11 +394,11 @@ def all_table_names_for_ns(ns)

def transform_one_ns(ns, obj)
transform_one(find_ns!(ns), obj)

end

def save_all_pks_for_ns(ns, new, old)
schema = find_ns!(ns)
# We only save top level keys.
primary_sql_keys = primary_sql_keys_for_schema(schema)

primary_sql_keys.each do |key|
Expand Down Expand Up @@ -413,17 +427,19 @@ def all_transforms_for_obj(schema, obj, parent_pks={}, &block)

# Make sure to add in the primary keys from any parent tables, since we
# might not automatically have them.
transformed = transform_one(schema, obj).update(parent_pks)
transformed = transform_one(schema, obj, parent_pks)

yield table_ident, primary_keys, transformed

pks = Hash[primary_keys.map { |k| [
parent_scope_column(schema[:meta][:table], k),
transformed[k]
] } ].update(parent_pks)
schema[:subtables].each do |subspec|
source = subspec[:meta][:source]
subobjs = bson_dig(obj, *source)
break if subobjs.nil?

raise "Too many primary keys" if primary_keys.length > 1
pks = {subspec[:meta][:parent_fkey] => transformed[primary_keys[0]]}
subobjs.each do |subobj|
all_transforms_for_obj(subspec, subobj, pks, &block)
end
Expand Down Expand Up @@ -452,9 +468,6 @@ def primary_sql_keys_for_schema(schema)
else
keys << schema[:columns].find {|c| c[:source] == '_id'}[:name]
end
if schema[:meta][:parent_fkey]
keys << schema[:meta][:parent_fkey]
end

return keys
end
Expand Down
2 changes: 1 addition & 1 deletion lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def import_collection(ns, collection, filter)
if count % BATCH == 0
sql_time += upsert_all_batches(batches, ns)
elapsed = Time.now - start
log.info("Imported #{count} rows into #{collection} (#{elapsed}s, #{sql_time}s SQL)...")
log.info("Imported #{count} rows into #{ns} (#{elapsed}s, #{sql_time}s SQL)...")
exit(0) if @done
end
end
Expand Down

0 comments on commit 9e10bfb

Please sign in to comment.