Skip to content

Commit

Permalink
replication origin support
Browse files Browse the repository at this point in the history
  • Loading branch information
Pyry Liukas committed Dec 7, 2023
1 parent 0217879 commit 7f3d636
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/pgsync/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def slop_options
o.boolean "--in-batches", "sync in batches", default: false, help: false
o.integer "--batch-size", "batch size", default: 10000, help: false
o.float "--sleep", "time to sleep between batches", default: 0, help: false
o.string "--replication-origin", "replication origin", help: false

o.separator ""
o.separator "Other commands:"
Expand Down
19 changes: 19 additions & 0 deletions lib/pgsync/data_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,25 @@ def triggers(table)
execute(query, [quote_ident_full(table)])
end

def set_replication_origin(origin)
query = <<~SQL
SELECT
CASE
WHEN EXISTS (SELECT 1 FROM pg_replication_origin WHERE roname = $1) THEN null
ELSE pg_replication_origin_create($1)
END;
SQL
execute(query, [origin])
query = <<~SQL
SELECT
CASE
WHEN pg_replication_origin_session_is_setup() THEN null
ELSE pg_replication_origin_session_setup($1)
END;
SQL
execute(query, [origin])
end

def conn
@conn ||= begin
begin
Expand Down
2 changes: 1 addition & 1 deletion lib/pgsync/sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def perform
end

# merge other config
[:to_safe, :exclude, :schemas].each do |opt|
[:to_safe, :exclude, :schemas, :replication_origin].each do |opt|
opts[opt] ||= config[opt.to_s]
end

Expand Down
4 changes: 3 additions & 1 deletion lib/pgsync/table_sync.rb
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ def run_tasks(tasks, &block)
Parallel.each(tasks, **options) do |task|
source.reconnect_if_needed
destination.reconnect_if_needed

if opts[:replication_origin]
destination.set_replication_origin(opts[:replication_origin])
end
task.perform
end
end
Expand Down
7 changes: 7 additions & 0 deletions test/sync_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,11 @@ def test_disable_integrity_v2
assert_equal [], conn2.exec("SELECT * FROM posts ORDER BY id").to_a
assert_equal [{"post_id" => 1}], conn2.exec("SELECT post_id FROM comments ORDER BY post_id").to_a
end

def test_replication_origin
insert(conn1, "posts", [{"id" => 1}])
assert_error "Sync failed for 1 table: posts", "posts", config: true
assert_works "posts --replication-origin=test --debug", config: true
assert_equal [{"exists" => 1}], conn2.exec("SELECT 1 AS exists FROM pg_replication_origin WHERE roname = 'test'").to_a
end
end

0 comments on commit 7f3d636

Please sign in to comment.