Skip to content

Commit e17f88e

Browse files
committed
allow different queue ID sizes
1 parent f63b170 commit e17f88e

File tree

4 files changed

+73
-108
lines changed

4 files changed

+73
-108
lines changed

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/Util.hs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@ AS $$
1616
DECLARE
1717
result BYTEA := state;
1818
i INTEGER;
19-
len INTEGER := octet_length(value);
19+
state_len INTEGER := octet_length(state);
20+
value_len INTEGER := octet_length(value);
21+
max_len INTEGER := GREATEST(state_len, value_len);
22+
byte_val INTEGER;
2023
BEGIN
21-
IF octet_length(state) != len THEN
22-
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
24+
IF value_len > state_len THEN
25+
result := result || repeat('\x00'::BYTEA, value_len - state_len);
2326
END IF;
24-
FOR i IN 0..len-1 LOOP
25-
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
27+
FOR i IN 0..max_len-1 LOOP
28+
byte_val := CASE WHEN i < value_len THEN get_byte(value, i) ELSE 0 END;
29+
result := set_byte(result, i, get_byte(result, i) # byte_val);
2630
END LOOP;
2731
RETURN result;
2832
END;

src/Simplex/Messaging/Agent/Store/Postgres/Migrations/agent_postgres_schema.sql

Lines changed: 46 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,91 +15,32 @@ SET row_security = off;
1515
CREATE SCHEMA smp_agent_test_protocol_schema;
1616

1717

18+
SET default_table_access_method = heap;
19+
1820

19-
CREATE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_delete() RETURNS trigger
20-
LANGUAGE plpgsql
21-
AS $$
22-
BEGIN
23-
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
24-
PERFORM update_aggregates(OLD.user_id, OLD.host, OLD.port, -1, OLD.rcv_id);
25-
END IF;
26-
RETURN OLD;
27-
END;
28-
$$;
29-
30-
31-
32-
CREATE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_insert() RETURNS trigger
33-
LANGUAGE plpgsql
34-
AS $$
35-
BEGIN
36-
IF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
37-
PERFORM update_aggregates(NEW.user_id, NEW.host, NEW.port, 1, NEW.rcv_id);
38-
END IF;
39-
RETURN NEW;
40-
END;
41-
$$;
42-
43-
44-
45-
CREATE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_update() RETURNS trigger
46-
LANGUAGE plpgsql
47-
AS $$
48-
BEGIN
49-
IF OLD.rcv_service_assoc != 0 AND OLD.deleted = 0 THEN
50-
IF NOT (NEW.rcv_service_assoc != 0 AND NEW.deleted = 0) THEN
51-
PERFORM update_aggregates(OLD.user_id, OLD.host, OLD.port, -1, OLD.rcv_id);
52-
END IF;
53-
ELSIF NEW.rcv_service_assoc != 0 AND NEW.deleted = 0 THEN
54-
PERFORM update_aggregates(NEW.user_id, NEW.host, NEW.port, 1, NEW.rcv_id);
55-
END IF;
56-
RETURN NEW;
57-
END;
58-
$$;
59-
60-
61-
62-
CREATE FUNCTION smp_agent_test_protocol_schema.update_aggregates(p_user_id bigint, p_host text, p_port text, p_change bigint, p_rcv_id bytea) RETURNS void
63-
LANGUAGE plpgsql
64-
AS $$
65-
BEGIN
66-
UPDATE client_services
67-
SET service_queue_count = service_queue_count + p_change,
68-
service_queue_ids_hash = xor_combine(service_queue_ids_hash, public.digest(p_rcv_id, 'md5'))
69-
WHERE user_id = p_user_id AND host = p_host AND port = p_port;
70-
END;
71-
$$;
72-
73-
74-
75-
CREATE FUNCTION smp_agent_test_protocol_schema.xor_combine(state bytea, value bytea) RETURNS bytea
76-
LANGUAGE plpgsql IMMUTABLE STRICT
77-
AS $$
78-
DECLARE
79-
result BYTEA := state;
80-
i INTEGER;
81-
len INTEGER := octet_length(value);
82-
BEGIN
83-
IF octet_length(state) != len THEN
84-
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
85-
END IF;
86-
FOR i IN 0..len-1 LOOP
87-
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
88-
END LOOP;
89-
RETURN result;
90-
END;
91-
$$;
92-
93-
94-
95-
CREATE AGGREGATE smp_agent_test_protocol_schema.xor_aggregate(bytea) (
96-
SFUNC = smp_agent_test_protocol_schema.xor_combine,
97-
STYPE = bytea,
98-
INITCOND = '\x00000000000000000000000000000000'
21+
CREATE TABLE smp_agent_test_protocol_schema.client_notices (
22+
client_notice_id bigint NOT NULL,
23+
protocol text NOT NULL,
24+
host text NOT NULL,
25+
port text NOT NULL,
26+
entity_id bytea NOT NULL,
27+
server_key_hash bytea,
28+
notice_ttl bigint,
29+
created_at bigint NOT NULL,
30+
updated_at bigint NOT NULL
9931
);
10032

10133

102-
SET default_table_access_method = heap;
34+
35+
ALTER TABLE smp_agent_test_protocol_schema.client_notices ALTER COLUMN client_notice_id ADD GENERATED ALWAYS AS IDENTITY (
36+
SEQUENCE NAME smp_agent_test_protocol_schema.client_notices_client_notice_id_seq
37+
START WITH 1
38+
INCREMENT BY 1
39+
NO MINVALUE
40+
NO MAXVALUE
41+
CACHE 1
42+
);
43+
10344

10445

10546
CREATE TABLE smp_agent_test_protocol_schema.client_services (
@@ -535,6 +476,8 @@ CREATE TABLE smp_agent_test_protocol_schema.rcv_queues (
535476
link_priv_sig_key bytea,
536477
link_enc_fixed_data bytea,
537478
queue_mode text,
479+
to_subscribe smallint DEFAULT 0 NOT NULL,
480+
client_notice_id bigint,
538481
rcv_service_assoc smallint DEFAULT 0 NOT NULL
539482
);
540483

@@ -816,6 +759,11 @@ ALTER TABLE smp_agent_test_protocol_schema.xftp_servers ALTER COLUMN xftp_server
816759

817760

818761

762+
ALTER TABLE ONLY smp_agent_test_protocol_schema.client_notices
763+
ADD CONSTRAINT client_notices_pkey PRIMARY KEY (client_notice_id);
764+
765+
766+
819767
ALTER TABLE ONLY smp_agent_test_protocol_schema.commands
820768
ADD CONSTRAINT commands_pkey PRIMARY KEY (command_id);
821769

@@ -996,6 +944,10 @@ ALTER TABLE ONLY smp_agent_test_protocol_schema.xftp_servers
996944

997945

998946

947+
CREATE UNIQUE INDEX idx_client_notices_entity ON smp_agent_test_protocol_schema.client_notices USING btree (protocol, host, port, entity_id);
948+
949+
950+
999951
CREATE INDEX idx_commands_conn_id ON smp_agent_test_protocol_schema.commands USING btree (conn_id);
1000952

1001953

@@ -1124,6 +1076,10 @@ CREATE UNIQUE INDEX idx_rcv_queue_id ON smp_agent_test_protocol_schema.rcv_queue
11241076

11251077

11261078

1079+
CREATE INDEX idx_rcv_queues_client_notice_id ON smp_agent_test_protocol_schema.rcv_queues USING btree (client_notice_id);
1080+
1081+
1082+
11271083
CREATE UNIQUE INDEX idx_rcv_queues_link_id ON smp_agent_test_protocol_schema.rcv_queues USING btree (host, port, link_id);
11281084

11291085

@@ -1132,6 +1088,10 @@ CREATE UNIQUE INDEX idx_rcv_queues_ntf ON smp_agent_test_protocol_schema.rcv_que
11321088

11331089

11341090

1091+
CREATE INDEX idx_rcv_queues_to_subscribe ON smp_agent_test_protocol_schema.rcv_queues USING btree (to_subscribe);
1092+
1093+
1094+
11351095
CREATE INDEX idx_server_certs_host_port ON smp_agent_test_protocol_schema.client_services USING btree (host, port);
11361096

11371097

@@ -1208,18 +1168,6 @@ CREATE INDEX idx_snd_queues_host_port ON smp_agent_test_protocol_schema.snd_queu
12081168

12091169

12101170

1211-
CREATE TRIGGER tr_rcv_queue_delete AFTER DELETE ON smp_agent_test_protocol_schema.rcv_queues FOR EACH ROW EXECUTE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_delete();
1212-
1213-
1214-
1215-
CREATE TRIGGER tr_rcv_queue_insert AFTER INSERT ON smp_agent_test_protocol_schema.rcv_queues FOR EACH ROW EXECUTE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_insert();
1216-
1217-
1218-
1219-
CREATE TRIGGER tr_rcv_queue_update AFTER UPDATE ON smp_agent_test_protocol_schema.rcv_queues FOR EACH ROW EXECUTE FUNCTION smp_agent_test_protocol_schema.on_rcv_queue_update();
1220-
1221-
1222-
12231171
ALTER TABLE ONLY smp_agent_test_protocol_schema.client_services
12241172
ADD CONSTRAINT client_services_host_port_fkey FOREIGN KEY (host, port) REFERENCES smp_agent_test_protocol_schema.servers(host, port) ON UPDATE CASCADE ON DELETE RESTRICT;
12251173

@@ -1345,6 +1293,11 @@ ALTER TABLE ONLY smp_agent_test_protocol_schema.rcv_messages
13451293

13461294

13471295

1296+
ALTER TABLE ONLY smp_agent_test_protocol_schema.rcv_queues
1297+
ADD CONSTRAINT rcv_queues_client_notice_id_fkey FOREIGN KEY (client_notice_id) REFERENCES smp_agent_test_protocol_schema.client_notices(client_notice_id) ON UPDATE RESTRICT ON DELETE SET NULL;
1298+
1299+
1300+
13481301
ALTER TABLE ONLY smp_agent_test_protocol_schema.rcv_queues
13491302
ADD CONSTRAINT rcv_queues_conn_id_fkey FOREIGN KEY (conn_id) REFERENCES smp_agent_test_protocol_schema.connections(conn_id) ON DELETE CASCADE;
13501303

src/Simplex/Messaging/Notifications/Server/Store/ntf_server_schema.sql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,17 @@ CREATE FUNCTION ntf_server.xor_combine(state bytea, value bytea) RETURNS bytea
111111
DECLARE
112112
result BYTEA := state;
113113
i INTEGER;
114-
len INTEGER := octet_length(value);
114+
state_len INTEGER := octet_length(state);
115+
value_len INTEGER := octet_length(value);
116+
max_len INTEGER := GREATEST(state_len, value_len);
117+
byte_val INTEGER;
115118
BEGIN
116-
IF octet_length(state) != len THEN
117-
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
119+
IF value_len > state_len THEN
120+
result := result || repeat('\x00'::BYTEA, value_len - state_len);
118121
END IF;
119-
FOR i IN 0..len-1 LOOP
120-
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
122+
FOR i IN 0..max_len-1 LOOP
123+
byte_val := CASE WHEN i < value_len THEN get_byte(value, i) ELSE 0 END;
124+
result := set_byte(result, i, get_byte(result, i) # byte_val);
121125
END LOOP;
122126
RETURN result;
123127
END;

src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,13 +365,17 @@ CREATE FUNCTION smp_server.xor_combine(state bytea, value bytea) RETURNS bytea
365365
DECLARE
366366
result BYTEA := state;
367367
i INTEGER;
368-
len INTEGER := octet_length(value);
368+
state_len INTEGER := octet_length(state);
369+
value_len INTEGER := octet_length(value);
370+
max_len INTEGER := GREATEST(state_len, value_len);
371+
byte_val INTEGER;
369372
BEGIN
370-
IF octet_length(state) != len THEN
371-
RAISE EXCEPTION 'Inputs must be equal length (% != %)', octet_length(state), len;
373+
IF value_len > state_len THEN
374+
result := result || repeat('\x00'::BYTEA, value_len - state_len);
372375
END IF;
373-
FOR i IN 0..len-1 LOOP
374-
result := set_byte(result, i, get_byte(state, i) # get_byte(value, i));
376+
FOR i IN 0..max_len-1 LOOP
377+
byte_val := CASE WHEN i < value_len THEN get_byte(value, i) ELSE 0 END;
378+
result := set_byte(result, i, get_byte(result, i) # byte_val);
375379
END LOOP;
376380
RETURN result;
377381
END;

0 commit comments

Comments
 (0)