|
7 | 7 | Copyright (c) 2015-2017, Postgres Professional
|
8 | 8 | """
|
9 | 9 |
|
| 10 | +import functools |
10 | 11 | import json
|
11 | 12 | import math
|
| 13 | +import multiprocessing |
12 | 14 | import os
|
| 15 | +import random |
13 | 16 | import re
|
14 | 17 | import subprocess
|
| 18 | +import sys |
15 | 19 | import threading
|
16 | 20 | import time
|
17 | 21 | import unittest
|
18 |
| -import functools |
19 | 22 |
|
20 | 23 | from distutils.version import LooseVersion
|
21 | 24 | from testgres import get_new_node, get_pg_version
|
@@ -85,10 +88,17 @@ def set_trace(self, con, command="pg_debug"):
|
85 | 88 | p = subprocess.Popen([command], stdin=subprocess.PIPE)
|
86 | 89 | p.communicate(str(pid).encode())
|
87 | 90 |
|
88 |
| - def start_new_pathman_cluster(self, allow_streaming=False, test_data=False): |
| 91 | + def start_new_pathman_cluster(self, |
| 92 | + allow_streaming=False, |
| 93 | + test_data=False, |
| 94 | + enable_partitionrouter=False): |
| 95 | + |
89 | 96 | node = get_new_node()
|
90 | 97 | node.init(allow_streaming=allow_streaming)
|
91 | 98 | node.append_conf("shared_preload_libraries='pg_pathman'\n")
|
| 99 | + if enable_partitionrouter: |
| 100 | + node.append_conf("pg_pathman.enable_partitionrouter=on\n") |
| 101 | + |
92 | 102 | node.start()
|
93 | 103 | node.psql('create extension pg_pathman')
|
94 | 104 |
|
@@ -1065,6 +1075,57 @@ def test_update_node_plan1(self):
|
1065 | 1075 | node.psql('postgres', 'DROP SCHEMA test_update_node CASCADE;')
|
1066 | 1076 | node.psql('postgres', 'DROP EXTENSION pg_pathman CASCADE;')
|
1067 | 1077 |
|
| 1078 | + def test_concurrent_updates(self): |
| 1079 | + ''' |
| 1080 | + Test whether conncurrent updates work correctly between |
| 1081 | + partitions. |
| 1082 | + ''' |
| 1083 | + |
| 1084 | + create_sql = ''' |
| 1085 | + CREATE TABLE test1(id INT, b INT NOT NULL); |
| 1086 | + INSERT INTO test1 |
| 1087 | + SELECT i, i FROM generate_series(1, 100) i; |
| 1088 | + SELECT create_range_partitions('test1', 'b', 1, 5); |
| 1089 | + ''' |
| 1090 | + |
| 1091 | + with self.start_new_pathman_cluster(enable_partitionrouter=True) as node: |
| 1092 | + node.safe_psql(create_sql) |
| 1093 | + |
| 1094 | + pool = multiprocessing.Pool(processes=4) |
| 1095 | + for count in range(1, 200): |
| 1096 | + pool.apply_async(make_updates, (node, count, )) |
| 1097 | + |
| 1098 | + pool.close() |
| 1099 | + pool.join() |
| 1100 | + |
| 1101 | + # check all data is there and not duplicated |
| 1102 | + with node.connect() as con: |
| 1103 | + for i in range(1, 100): |
| 1104 | + row = con.execute("select count(*) from test1 where id = %d" % i)[0] |
| 1105 | + self.assertEqual(row[0], 1) |
| 1106 | + |
| 1107 | + self.assertEqual(node.execute("select count(*) from test1")[0][0], 100) |
| 1108 | + |
| 1109 | + |
| 1110 | +def make_updates(node, count): |
| 1111 | + update_sql = ''' |
| 1112 | + BEGIN; |
| 1113 | + UPDATE test1 SET b = trunc(random() * 100 + 1) WHERE id in (%s); |
| 1114 | + COMMIT; |
| 1115 | + ''' |
| 1116 | + |
| 1117 | + with node.connect() as con: |
| 1118 | + for i in range(count): |
| 1119 | + rows_to_update = random.randint(20, 50) |
| 1120 | + ids = set([str(random.randint(1, 100)) for i in range(rows_to_update)]) |
| 1121 | + con.execute(update_sql % ','.join(ids)) |
| 1122 | + |
1068 | 1123 |
|
1069 | 1124 | if __name__ == "__main__":
|
1070 |
| - unittest.main() |
| 1125 | + if len(sys.argv) > 1: |
| 1126 | + suite = unittest.TestLoader().loadTestsFromName(sys.argv[1], |
| 1127 | + module=sys.modules[__name__]) |
| 1128 | + else: |
| 1129 | + suite = unittest.TestLoader().loadTestsFromTestCase(Tests) |
| 1130 | + |
| 1131 | + unittest.TextTestRunner(verbosity=2, failfast=True).run(suite) |
0 commit comments