Skip to content

Commit 73d2aeb

Browse files
committed
Safely acquire lock on multiple tables
1 parent 3eab753 commit 73d2aeb

File tree

2 files changed

+179
-48
lines changed

2 files changed

+179
-48
lines changed

lib/pg_ha_migrations/safe_statements.rb

+51-25
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,14 @@ def safe_added_columns_without_default_value
33
@safe_added_columns_without_default_value ||= []
44
end
55

6+
# This variable is used to track nested lock acquisition.
7+
# Each element is an array of table objects with their lock modes.
8+
# The order of the array represents the current call stack,
9+
# where the most recent method call is the last element.
10+
def safely_acquire_lock_for_table_history
11+
@safely_acquire_lock_for_table_history ||= []
12+
end
13+
614
def safe_create_table(table, options={}, &block)
715
if options[:force]
816
raise PgHaMigrations::UnsafeMigrationError.new(":force is NOT SAFE! Explicitly call unsafe_drop_table first if you want to recreate an existing table")
@@ -528,40 +536,58 @@ def exec_migration(conn, direction)
528536
super(conn, direction)
529537
end
530538

531-
def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
532-
nested_target_table = Thread.current[__method__]
539+
def safely_acquire_lock_for_table(*tables, mode: :access_exclusive, &block)
540+
# So this variable is always available in the ensure block
541+
successfully_acquired_lock = false
533542

534543
_check_postgres_adapter!
535544

536-
target_table = PgHaMigrations::Table.from_table_name(table, mode)
545+
target_tables = tables.map do |target_table|
546+
PgHaMigrations::Table.from_table_name(target_table, mode)
547+
end.sort_by(&:fully_qualified_name)
548+
549+
# Lock mode is the same across supplied tables so just grab the first.
550+
# This variable is useful for error handling and messaging.
551+
target_mode = target_tables.first.mode
537552

538-
if nested_target_table
539-
if nested_target_table != target_table
540-
raise PgHaMigrations::InvalidMigrationError, "Nested lock detected! Cannot acquire lock on #{target_table.fully_qualified_name} while #{nested_target_table.fully_qualified_name} is locked."
541-
elsif nested_target_table.mode < target_table.mode
542-
raise PgHaMigrations::InvalidMigrationError, "Lock escalation detected! Cannot change lock level from :#{nested_target_table.mode} to :#{target_table.mode} for #{target_table.fully_qualified_name}."
553+
# Grab the latest locked tables from the call stack.
554+
# This will be nil if we are not in a nested context.
555+
nested_target_tables = safely_acquire_lock_for_table_history.last
556+
557+
if nested_target_tables
558+
if target_tables != nested_target_tables
559+
raise PgHaMigrations::InvalidMigrationError,
560+
"Nested lock detected! Cannot acquire lock on #{target_tables.map(&:fully_qualified_name).join(", ")} " \
561+
"while #{nested_target_tables.map(&:fully_qualified_name).join(", ")} is locked."
543562
end
544-
else
545-
Thread.current[__method__] = target_table
546-
end
547563

548-
# Locking a partitioned table will also lock child tables (including sub-partitions),
549-
# so we need to check for blocking queries on those tables as well
550-
target_tables = target_table.partitions(include_sub_partitions: true, include_self: true)
564+
nested_target_mode = nested_target_tables.first.mode
551565

552-
successfully_acquired_lock = false
566+
if nested_target_mode < target_mode
567+
raise PgHaMigrations::InvalidMigrationError,
568+
"Lock escalation detected! Cannot change lock level from :#{nested_target_mode} " \
569+
"to :#{target_mode} for #{target_tables.map(&:fully_qualified_name).join(", ")}."
570+
end
571+
end
553572

554573
until successfully_acquired_lock
555-
while (
574+
loop do
556575
blocking_transactions = PgHaMigrations::BlockingDatabaseTransactions.find_blocking_transactions("#{PgHaMigrations::LOCK_TIMEOUT_SECONDS} seconds")
557-
blocking_transactions.any? do |query|
576+
577+
# Locking a partitioned table will also lock child tables (including sub-partitions),
578+
# so we need to check for blocking queries on those tables as well
579+
target_tables_for_blocking_transactions = target_tables.flat_map do |target_table|
580+
target_table.partitions(include_sub_partitions: true, include_self: true)
581+
end
582+
583+
break unless blocking_transactions.any? do |query|
558584
query.tables_with_locks.any? do |locked_table|
559-
target_tables.any? do |target_table|
585+
target_tables_for_blocking_transactions.any? do |target_table|
560586
target_table.conflicts_with?(locked_table)
561587
end
562588
end
563589
end
564-
)
590+
565591
say "Waiting on blocking transactions:"
566592
blocking_transactions.each do |blocking_transaction|
567593
say blocking_transaction.description
@@ -570,16 +596,15 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
570596
end
571597

572598
connection.transaction do
573-
adjust_timeout_method = connection.postgresql_version >= 9_03_00 ? :adjust_lock_timeout : :adjust_statement_timeout
574599
begin
575-
method(adjust_timeout_method).call(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
576-
connection.execute("LOCK #{target_table.fully_qualified_name} IN #{target_table.mode.to_sql} MODE;")
600+
adjust_statement_timeout(PgHaMigrations::LOCK_TIMEOUT_SECONDS) do
601+
connection.execute("LOCK #{target_tables.map(&:fully_qualified_name).join(", ")} IN #{target_mode.to_sql} MODE;")
577602
end
578603
successfully_acquired_lock = true
579604
rescue ActiveRecord::StatementInvalid => e
580-
if e.message =~ /PG::LockNotAvailable.+ lock timeout/ || e.message =~ /PG::QueryCanceled.+ statement timeout/
605+
if e.message =~ /PG::QueryCanceled.+ statement timeout/
581606
sleep_seconds = PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER * PgHaMigrations::LOCK_TIMEOUT_SECONDS
582-
say "Timed out trying to acquire #{target_table.mode.to_sql} lock on the #{target_table.fully_qualified_name} table."
607+
say "Timed out trying to acquire #{target_mode.to_sql} lock on #{target_tables.map(&:fully_qualified_name).join(", ")}."
583608
say "Sleeping for #{sleep_seconds}s to allow potentially queued up queries to finish before continuing."
584609
sleep(sleep_seconds)
585610

@@ -590,12 +615,13 @@ def safely_acquire_lock_for_table(table, mode: :access_exclusive, &block)
590615
end
591616

592617
if successfully_acquired_lock
618+
safely_acquire_lock_for_table_history.push(target_tables)
593619
block.call
594620
end
595621
end
596622
end
597623
ensure
598-
Thread.current[__method__] = nil unless nested_target_table
624+
safely_acquire_lock_for_table_history.pop if successfully_acquired_lock
599625
end
600626

601627
def adjust_lock_timeout(timeout_seconds = PgHaMigrations::LOCK_TIMEOUT_SECONDS, &block)

spec/safe_statements_spec.rb

+128-23
Original file line numberDiff line numberDiff line change
@@ -3793,14 +3793,15 @@ def up
37933793
let(:alternate_connection_pool) do
37943794
ActiveRecord::ConnectionAdapters::ConnectionPool.new(pool_config)
37953795
end
3796-
let(:alternate_connection) do
3797-
alternate_connection_pool.connection
3798-
end
3796+
3797+
let(:alternate_connection) { alternate_connection_pool.connection }
3798+
let(:alternate_connection_2) { alternate_connection_pool.connection }
37993799
let(:migration) { Class.new(migration_klass).new }
38003800

38013801
before(:each) do
38023802
ActiveRecord::Base.connection.execute(<<~SQL)
38033803
CREATE TABLE #{table_name}(pk SERIAL, i INTEGER);
3804+
CREATE TABLE #{table_name}_2(pk SERIAL, i INTEGER);
38043805
CREATE SCHEMA partman;
38053806
CREATE EXTENSION pg_partman SCHEMA partman;
38063807
SQL
@@ -3829,6 +3830,28 @@ def up
38293830
end
38303831
end
38313832

3833+
it "acquires exclusive locks by default when multiple tables provided" do
3834+
migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do
3835+
expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly(
3836+
having_attributes(
3837+
table: "bogus_table",
3838+
lock_type: "AccessExclusiveLock",
3839+
granted: true,
3840+
pid: kind_of(Integer),
3841+
)
3842+
)
3843+
3844+
expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly(
3845+
having_attributes(
3846+
table: "bogus_table_2",
3847+
lock_type: "AccessExclusiveLock",
3848+
granted: true,
3849+
pid: kind_of(Integer),
3850+
)
3851+
)
3852+
end
3853+
end
3854+
38323855
it "acquires a lock in a different mode when provided" do
38333856
migration.safely_acquire_lock_for_table(table_name, mode: :share) do
38343857
expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly(
@@ -3842,6 +3865,28 @@ def up
38423865
end
38433866
end
38443867

3868+
it "acquires locks in a different mode when multiple tables and mode provided" do
3869+
migration.safely_acquire_lock_for_table(table_name, "bogus_table_2", mode: :share_row_exclusive) do
3870+
expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly(
3871+
having_attributes(
3872+
table: "bogus_table",
3873+
lock_type: "ShareRowExclusiveLock",
3874+
granted: true,
3875+
pid: kind_of(Integer),
3876+
)
3877+
)
3878+
3879+
expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly(
3880+
having_attributes(
3881+
table: "bogus_table_2",
3882+
lock_type: "ShareRowExclusiveLock",
3883+
granted: true,
3884+
pid: kind_of(Integer),
3885+
)
3886+
)
3887+
end
3888+
end
3889+
38453890
it "raises error when invalid lock mode provided" do
38463891
expect do
38473892
migration.safely_acquire_lock_for_table(table_name, mode: :garbage) {}
@@ -3883,6 +3928,39 @@ def up
38833928
end
38843929
end
38853930

3931+
it "times out the lock query after LOCK_TIMEOUT_SECONDS when multiple tables provided" do
3932+
stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1)
3933+
stub_const("PgHaMigrations::LOCK_FAILURE_RETRY_DELAY_MULTLIPLIER", 0)
3934+
allow(PgHaMigrations::BlockingDatabaseTransactions).to receive(:find_blocking_transactions).and_return([])
3935+
allow(ActiveRecord::Base.connection).to receive(:execute).and_call_original
3936+
3937+
expect(ActiveRecord::Base.connection).to receive(:execute)
3938+
.with("LOCK \"public\".\"bogus_table\", \"public\".\"bogus_table_2\" IN ACCESS EXCLUSIVE MODE;")
3939+
.at_least(2)
3940+
.times
3941+
3942+
begin
3943+
query_thread = Thread.new do
3944+
alternate_connection.execute("BEGIN; LOCK bogus_table_2;")
3945+
sleep 3
3946+
alternate_connection.execute("ROLLBACK")
3947+
end
3948+
3949+
sleep 0.5
3950+
3951+
migration.suppress_messages do
3952+
migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do
3953+
aggregate_failures do
3954+
expect(locks_for_table(table_name, connection: alternate_connection_2)).not_to be_empty
3955+
expect(locks_for_table("bogus_table_2", connection: alternate_connection_2)).not_to be_empty
3956+
end
3957+
end
3958+
end
3959+
ensure
3960+
query_thread.join
3961+
end
3962+
end
3963+
38863964
it "does not wait to acquire a lock if the table has an existing but non-conflicting lock" do
38873965
stub_const("PgHaMigrations::LOCK_TIMEOUT_SECONDS", 1)
38883966

@@ -4303,6 +4381,53 @@ def up
43034381
expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty
43044382
end
43054383

4384+
it "allows re-entrancy when multiple tables provided" do
4385+
migration.safely_acquire_lock_for_table(table_name, "bogus_table_2") do
4386+
# The ordering of the args is intentional here to ensure
4387+
# the array sorting and equality logic works as intended
4388+
migration.safely_acquire_lock_for_table("bogus_table_2", table_name) do
4389+
expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly(
4390+
having_attributes(
4391+
table: "bogus_table",
4392+
lock_type: "AccessExclusiveLock",
4393+
granted: true,
4394+
pid: kind_of(Integer),
4395+
),
4396+
)
4397+
4398+
expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly(
4399+
having_attributes(
4400+
table: "bogus_table_2",
4401+
lock_type: "AccessExclusiveLock",
4402+
granted: true,
4403+
pid: kind_of(Integer),
4404+
),
4405+
)
4406+
end
4407+
4408+
expect(locks_for_table(table_name, connection: alternate_connection)).to contain_exactly(
4409+
having_attributes(
4410+
table: "bogus_table",
4411+
lock_type: "AccessExclusiveLock",
4412+
granted: true,
4413+
pid: kind_of(Integer),
4414+
),
4415+
)
4416+
4417+
expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to contain_exactly(
4418+
having_attributes(
4419+
table: "bogus_table_2",
4420+
lock_type: "AccessExclusiveLock",
4421+
granted: true,
4422+
pid: kind_of(Integer),
4423+
),
4424+
)
4425+
end
4426+
4427+
expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty
4428+
expect(locks_for_table("bogus_table_2", connection: alternate_connection)).to be_empty
4429+
end
4430+
43064431
it "allows re-entrancy when inner lock is a lower level" do
43074432
migration.safely_acquire_lock_for_table(table_name) do
43084433
migration.safely_acquire_lock_for_table(table_name, mode: :exclusive) do
@@ -4405,26 +4530,6 @@ def up
44054530

44064531
expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty
44074532
end
4408-
4409-
it "uses statement_timeout instead of lock_timeout when on Postgres 9.1" do
4410-
allow(ActiveRecord::Base.connection).to receive(:postgresql_version).and_wrap_original do |m, *args|
4411-
if caller.detect { |line| line =~ /lib\/pg_ha_migrations\/blocking_database_transactions\.rb/ }
4412-
# The long-running transactions check needs to know the actual
4413-
# Postgres version to use the proper columns, so we don't want
4414-
# to mock any calls from it.
4415-
m.call(*args)
4416-
else
4417-
9_01_12
4418-
end
4419-
end
4420-
4421-
expect do
4422-
migration.safely_acquire_lock_for_table(table_name) do
4423-
expect(locks_for_table(table_name, connection: alternate_connection)).not_to be_empty
4424-
end
4425-
expect(locks_for_table(table_name, connection: alternate_connection)).to be_empty
4426-
end.not_to make_database_queries(matching: /lock_timeout/i)
4427-
end
44284533
end
44294534
end
44304535

0 commit comments

Comments
 (0)