Skip to content

Commit 6a7ea60

Browse files
Merge pull request #460 from aiven/rdunklau-add_walreceiver_tests
Add walreceiver tests #460
2 parents a7f080f + ad0defb commit 6a7ea60

File tree

5 files changed

+146
-92
lines changed

5 files changed

+146
-92
lines changed

test/conftest.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __init__(self, pgdata):
4343
self.pgdata = pgdata
4444
self.pg = None
4545
self.user = None
46+
self._connection = None
4647

4748
@property
4849
def pgver(self):
@@ -84,6 +85,11 @@ def kill(self, force=True, immediate=True):
8485
if not force and self.pg.poll() is None:
8586
raise Exception("PG pid {} not dead".format(self.pg.pid))
8687

88+
def connect(self):
89+
connection_string = pgutil.create_connection_string(self.user)
90+
conn = psycopg2.connect(connection_string)
91+
return conn
92+
8793

8894
@contextlib.contextmanager
8995
def setup_pg():
@@ -200,6 +206,13 @@ def pghoard(db, tmpdir, request): # pylint: disable=redefined-outer-name
200206
yield from pghoard_base(db, tmpdir, request)
201207

202208

209+
@pytest.yield_fixture
210+
def pghoard_walreceiver(db, tmpdir, request):
211+
# Initialize with only one transfer agent, as we want a reliable
212+
# last transfered state.
213+
yield from pghoard_base(db, tmpdir, request, active_backup_mode="walreceiver", transfer_count=1, compression_count=1)
214+
215+
203216
@pytest.yield_fixture # pylint: disable=redefined-outer-name
204217
def pghoard_separate_volume(db, tmpdir, request):
205218
tmpfs_volume = os.path.join(str(tmpdir), "tmpfs")
@@ -241,20 +254,20 @@ def pghoard_base(
241254
metrics_cfg=None,
242255
*,
243256
backup_location=None,
244-
pg_receivexlog_config=None
257+
pg_receivexlog_config=None,
258+
active_backup_mode="pg_receivexlog",
259+
slot_name=None,
260+
compression_count=None
245261
):
246262
test_site = request.function.__name__
247263

248-
if pg_receivexlog_config:
249-
active_backup_mode = "pg_receivexlog"
250-
elif os.environ.get("pghoard_test_walreceiver"):
251-
active_backup_mode = "walreceiver"
252-
else:
253-
active_backup_mode = "pg_receivexlog"
254-
255264
if compression == "snappy" and not snappy:
256265
compression = "lzma"
257266

267+
node = db.user.copy()
268+
if slot_name is not None:
269+
node["slot"] = slot_name
270+
258271
backup_location = backup_location or os.path.join(str(tmpdir), "backupspool")
259272
config = {
260273
"alert_file_dir": os.path.join(str(tmpdir), "alerts"),
@@ -267,7 +280,7 @@ def pghoard_base(
267280
"pg_bin_directory": db.pgbin,
268281
"pg_data_directory": db.pgdata,
269282
"pg_receivexlog": pg_receivexlog_config or {},
270-
"nodes": [db.user],
283+
"nodes": [node],
271284
"object_storage": {
272285
"storage_type": "local",
273286
"directory": os.path.join(str(tmpdir), "backups"),
@@ -297,6 +310,9 @@ def pghoard_base(
297310
if transfer_count is not None:
298311
config["transfer"] = {"thread_count": transfer_count}
299312

313+
if compression_count is not None:
314+
config["compression"]["thread_count"] = compression_count
315+
300316
confpath = os.path.join(str(tmpdir), "config.json")
301317
with open(confpath, "w") as fp:
302318
json.dump(config, fp)

test/test_basebackup.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import psycopg2
1717
import pytest
1818

19-
from pghoard import common, metrics, pgutil
19+
from pghoard import common, metrics
2020
from pghoard.basebackup import PGBaseBackup
2121
from pghoard.common import BaseBackupMode
2222
from pghoard.restore import Restore, RestoreError
2323
from pghoard.rohmu import get_transfer
2424
from pghoard.rohmu.compat import makedirs
2525

2626
from .conftest import PGTester
27+
from .util import switch_wal
2728

2829
Restore.log_tracebacks = True
2930

@@ -378,10 +379,9 @@ def test_basebackups_local_tar_legacy(self, capsys, db, pghoard, tmpdir):
378379
def test_basebackups_local_tar_exclusive_conflict(self, capsys, db, pghoard, tmpdir):
379380
if db.pgver >= "9.6":
380381
pytest.skip("PostgreSQL < 9.6 required for exclusive backup tests")
381-
conn_str = pgutil.create_connection_string(db.user)
382382
need_stop = False
383383
try:
384-
with psycopg2.connect(conn_str) as conn:
384+
with db.connect() as conn:
385385
conn.autocommit = True
386386
cursor = conn.cursor()
387387
cursor.execute("SELECT pg_start_backup('conflicting')") # pylint: disable=used-before-assignment
@@ -390,14 +390,13 @@ def test_basebackups_local_tar_exclusive_conflict(self, capsys, db, pghoard, tmp
390390
need_stop = False
391391
finally:
392392
if need_stop:
393-
with psycopg2.connect(conn_str) as conn:
393+
with db.connect() as conn:
394394
conn.autocommit = True
395395
cursor = conn.cursor()
396396
cursor.execute("SELECT pg_stop_backup()")
397397

398398
def test_basebackups_local_tar_pgespresso(self, capsys, db, pghoard, tmpdir):
399-
conn_str = pgutil.create_connection_string(db.user)
400-
with psycopg2.connect(conn_str) as conn:
399+
with db.connect() as conn:
401400
conn.autocommit = True
402401
cursor = conn.cursor()
403402
cursor.execute("SELECT 1 FROM pg_available_extensions WHERE name = 'pgespresso' AND default_version >= '1.2'")
@@ -415,8 +414,7 @@ def test_basebackups_replica_local_tar_nonexclusive(self, capsys, recovery_db, p
415414
self._test_basebackups(capsys, recovery_db, pghoard, tmpdir, BaseBackupMode.local_tar, replica=True)
416415

417416
def test_basebackups_replica_local_tar_pgespresso(self, capsys, recovery_db, pghoard, tmpdir):
418-
conn_str = pgutil.create_connection_string(recovery_db.user)
419-
with psycopg2.connect(conn_str) as conn:
417+
with recovery_db.connect() as conn:
420418
conn.autocommit = True
421419
cursor = conn.cursor()
422420
cursor.execute("SELECT 1 FROM pg_available_extensions WHERE name = 'pgespresso' AND default_version >= '1.2'")
@@ -433,8 +431,7 @@ def test_basebackups_tablespaces(self, capsys, db, pghoard, tmpdir):
433431
# tablespace could interfere with other tests
434432
tspath = tmpdir.join("extra-ts").strpath
435433
os.makedirs(tspath)
436-
conn_str = pgutil.create_connection_string(db.user)
437-
conn = psycopg2.connect(conn_str)
434+
conn = db.connect()
438435
conn.autocommit = True
439436
cursor = conn.cursor()
440437
cursor.execute("CREATE TABLESPACE tstest LOCATION %s", [tspath])
@@ -451,19 +448,9 @@ def test_basebackups_tablespaces(self, capsys, db, pghoard, tmpdir):
451448
wal_directory = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "xlog_incoming")
452449
makedirs(wal_directory, exist_ok=True)
453450
pghoard.receivexlog_listener(pghoard.test_site, db.user, wal_directory)
454-
if conn.server_version >= 100000:
455-
cursor.execute("SELECT txid_current(), pg_switch_wal()")
456-
else:
457-
cursor.execute("SELECT txid_current(), pg_switch_xlog()")
458-
451+
switch_wal(conn)
459452
self._test_create_basebackup(capsys, db, pghoard, "local-tar")
460-
461-
if conn.server_version >= 100000:
462-
cursor.execute("SELECT txid_current(), pg_switch_wal()")
463-
cursor.execute("SELECT txid_current(), pg_switch_wal()")
464-
else:
465-
cursor.execute("SELECT txid_current(), pg_switch_xlog()")
466-
cursor.execute("SELECT txid_current(), pg_switch_xlog()")
453+
switch_wal(conn)
467454

468455
backup_out = tmpdir.join("test-restore").strpath
469456
backup_ts_out = tmpdir.join("test-restore-tstest").strpath
@@ -574,13 +561,12 @@ def test_basebackups_tablespaces(self, capsys, db, pghoard, tmpdir):
574561
r_db = PGTester(backup_out)
575562
r_db.user = dict(db.user, host=backup_out)
576563
r_db.run_pg()
577-
r_conn_str = pgutil.create_connection_string(r_db.user)
578564

579565
# Wait for PG to start up
580566
start_time = time.monotonic()
581567
while True:
582568
try:
583-
r_conn = psycopg2.connect(r_conn_str)
569+
r_conn = r_db.connect()
584570
break
585571
except psycopg2.OperationalError as ex:
586572
if "starting up" in str(ex):

test/test_pghoard.py

Lines changed: 13 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
from pathlib import Path
1414
from unittest.mock import Mock, patch
1515

16-
import psycopg2
17-
1816
from pghoard import common
1917
from pghoard.common import (BaseBackupFormat, create_alert_file, delete_alert_file, write_json_file)
2018
from pghoard.pghoard import PGHoard
@@ -23,6 +21,7 @@
2321
from pghoard.rohmu import rohmufile
2422

2523
from .base import PGHoardTestCase
24+
from .util import switch_wal, wait_for_xlog
2625

2726

2827
class TestPGHoard(PGHoardTestCase):
@@ -551,10 +550,8 @@ def clean_alert_files():
551550

552551
def test_pause_on_disk_full(self, db, pghoard_separate_volume, caplog):
553552
pghoard = pghoard_separate_volume
554-
conn_str = create_connection_string(db.user)
555-
conn = psycopg2.connect(conn_str)
553+
conn = db.connect()
556554
conn.autocommit = True
557-
cursor = conn.cursor()
558555

559556
wal_directory = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "xlog_incoming")
560557
os.makedirs(wal_directory, exist_ok=True)
@@ -566,70 +563,26 @@ def test_pause_on_disk_full(self, db, pghoard_separate_volume, caplog):
566563
for _ in range(16):
567564
# Note: do not combine two function call in one select, PG executes it differently and
568565
# sometimes looks like it generates less WAL files than we wanted
569-
cursor.execute("SELECT txid_current()")
570-
if conn.server_version >= 100000:
571-
cursor.execute("SELECT pg_switch_wal()")
572-
else:
573-
cursor.execute("SELECT pg_switch_xlog()")
574-
575-
start = time.monotonic()
576-
while True:
577-
xlogs = None
578-
# At the start, this is not yet defined
579-
transfer_agent_state_for_site = pghoard.transfer_agent_state.get(pghoard.test_site)
580-
if transfer_agent_state_for_site:
581-
xlogs = transfer_agent_state_for_site["upload"]["xlog"]["xlogs_since_basebackup"]
582-
if xlogs >= 15:
583-
break
584-
585-
if time.monotonic() - start > 15:
586-
assert False, "Expected at least 15 xlog uploads, got {}".format(xlogs)
587-
588-
time.sleep(0.1)
566+
switch_wal(conn)
567+
conn.close()
568+
569+
wait_for_xlog(pghoard, 15)
589570
assert "pausing pg_receive(wal|xlog)" in caplog.text
590571

591572
def test_surviving_pg_receivewal_hickup(self, db, pghoard):
592-
conn_str = create_connection_string(db.user)
593-
conn = psycopg2.connect(conn_str)
594-
conn.autocommit = True
595-
cursor = conn.cursor()
596-
597573
wal_directory = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "xlog_incoming")
598574
os.makedirs(wal_directory, exist_ok=True)
599575

600-
def trigger_new_wal():
601-
# Note: do not combine two function call in one select, PG executes it differently and
602-
# sometimes looks like it generates less WAL files than we wanted
603-
cursor.execute("SELECT txid_current()")
604-
if conn.server_version >= 100000:
605-
cursor.execute("SELECT pg_switch_wal()")
606-
else:
607-
cursor.execute("SELECT pg_switch_xlog()")
608-
609-
def wait_for_xlog(count: int):
610-
start = time.monotonic()
611-
while True:
612-
xlogs = None
613-
# At the start, this is not yet defined
614-
transfer_agent_state_for_site = pghoard.transfer_agent_state.get(pghoard.test_site)
615-
if transfer_agent_state_for_site:
616-
xlogs = transfer_agent_state_for_site["upload"]["xlog"]["xlogs_since_basebackup"]
617-
if xlogs >= count:
618-
break
619-
620-
if time.monotonic() - start > 15:
621-
assert False, "Expected at least {} xlog uploads, got {}".format(count, xlogs)
622-
623-
time.sleep(0.1)
624-
625576
pghoard.receivexlog_listener(pghoard.test_site, db.user, wal_directory)
577+
conn = db.connect()
578+
conn.autocommit = True
626579

627580
# Make sure we have already a few files so pg_receivewal has something to start from when it eventually restarts
628581
# +1: to finish the current one
629582
for _ in range(3 + 1):
630-
trigger_new_wal()
583+
switch_wal(conn)
631584

632-
wait_for_xlog(3)
585+
wait_for_xlog(pghoard, 3)
633586

634587
# stop pg_receivewal so we cannot process new WAL segments
635588
pghoard.receivexlogs[pghoard.test_site].running = False
@@ -641,11 +594,12 @@ def wait_for_xlog(count: int):
641594

642595
# add more WAL segments
643596
for _ in range(10):
644-
trigger_new_wal()
597+
switch_wal(conn)
598+
conn.close()
645599

646600
# restart
647601
pghoard.receivexlog_listener(pghoard.test_site, db.user, wal_directory)
648602
assert pghoard.receivexlogs[pghoard.test_site].is_alive()
649603

650604
# We should now process all created segments, not only the ones which were created after pg_receivewal was restarted
651-
wait_for_xlog(n_xlogs + 10)
605+
wait_for_xlog(pghoard, n_xlogs + 10)

test/test_walreceiver.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
from pghoard.pghoard import PGHoard
2+
from pghoard.wal import (get_current_wal_file, get_previous_wal_on_same_timeline, name_for_tli_log_seg, name_to_tli_log_seg)
3+
4+
from .util import switch_wal, wait_for_xlog
5+
6+
7+
def get_transfer_agent_upload_xlog_state(pghoard: PGHoard):
8+
transfer_agent_state = pghoard.transfer_agent_state.get(pghoard.test_site)
9+
if transfer_agent_state is None:
10+
return {}
11+
return transfer_agent_state["upload"]["xlog"]
12+
13+
14+
def stop_walreceiver(pghoard: PGHoard):
15+
walreceiver = pghoard.walreceivers.pop(pghoard.test_site)
16+
walreceiver.running = False
17+
walreceiver.join()
18+
return walreceiver.last_flushed_lsn
19+
20+
21+
class TestWalReceiver:
22+
def test_walreceiver(self, db, pghoard_walreceiver):
23+
"""
24+
Test the happy-path of the wal receiver.
25+
"""
26+
conn = db.connect()
27+
conn.autocommit = True
28+
server_version = conn.server_version
29+
30+
pghoard = pghoard_walreceiver
31+
node = pghoard.config["backup_sites"][pghoard.test_site]["nodes"][0]
32+
# The transfer agent state will be used to check what
33+
# was uploaded
34+
# Before starting the walreceiver, get the current wal name.
35+
wal_name = get_current_wal_file(node)
36+
# Start streaming, force a wal rotation, and check the wal has been
37+
# archived
38+
pghoard.start_walreceiver(pghoard.test_site, node, None)
39+
switch_wal(conn)
40+
# Check that we uploaded one file, and it is the right one.
41+
wait_for_xlog(pghoard, 1)
42+
last_flushed_lsn = stop_walreceiver(pghoard)
43+
# Record the last flushed lsn
44+
state = get_transfer_agent_upload_xlog_state(pghoard)
45+
assert state.get("xlogs_since_basebackup") == 1
46+
assert state.get("latest_filename") == wal_name
47+
48+
# Generate some more wal while the walreceiver is not running,
49+
# and check that we can fetch it once done using the recorded state
50+
for _ in range(3):
51+
switch_wal(conn)
52+
conn.close()
53+
# The last wal file is the previous one, as the current one is not
54+
# complete.
55+
wal_name = get_current_wal_file(node)
56+
tli, log, seg = name_to_tli_log_seg(wal_name)
57+
seg, log = get_previous_wal_on_same_timeline(seg, log, server_version)
58+
59+
previous_wal_name = name_for_tli_log_seg(tli, log, seg)
60+
61+
pghoard.start_walreceiver(pghoard.test_site, node, last_flushed_lsn)
62+
wait_for_xlog(pghoard, 4)
63+
last_flushed_lsn = stop_walreceiver(pghoard)
64+
state = get_transfer_agent_upload_xlog_state(pghoard)
65+
assert state.get("xlogs_since_basebackup") == 4
66+
assert state.get("latest_filename") == previous_wal_name

0 commit comments

Comments
 (0)