Skip to content

Commit 4a81fdf

Browse files
authored
Merge pull request #142 from SwissBorg/unique-indices
Partitioned schema - unique index on (persistence_id, sequence_number).
2 parents 2928ec1 + c04fd4f commit 4a81fdf

File tree

6 files changed

+123
-11
lines changed

6 files changed

+123
-11
lines changed

README.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,21 @@ Example partition names: `j_myActor_0`, `j_myActor_1`, `j_worker_0` etc.
113113
Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your `persistenceId`s and keep the `prefix` reasonably short.
114114

115115
> :warning: Once any of the partitioning setting under `postgres-journal.tables.journal.partitions` branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts.
116+
## Migration
116117

117-
## Migration from akka-persistence-jdbc 4.0.0
118+
### Migration from akka-persistence-jdbc 4.0.0
118119
It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0.
119120
Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts.
120121

121-
### How migration works
122+
#### How migration works
122123
Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table.
123124

124125
We provide you with an optional artifact, `akka-persistence-postgres-migration` that brings to your project the necessary classes to automate the above process.
125126

126127
**Important**: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an `old_` prefix. It's up to you when to drop them.
127128

128-
### How to use plugin provided migrations
129-
#### Add akka-persistence-migration to your project
129+
#### How to use plugin provided migrations
130+
##### Add akka-persistence-migration to your project
130131
Add the following to your `build.sbt`
131132
```
132133
libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.4.1"
@@ -141,7 +142,7 @@ For a maven project add:
141142
```
142143
to your `pom.xml`.
143144

144-
#### Create and run migrations:
145+
##### Create and run migrations:
145146
```scala
146147
import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration
147148
import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration
@@ -155,9 +156,20 @@ _ <- new Jdbc4SnapshotStoreMigration(config).run()
155156

156157
It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway).
157158

158-
### Examples
159+
#### Examples
159160
An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala
160161

162+
### Migration from akka-persistence-postgres 0.4.0 to 0.5.0
163+
New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps:
164+
1. manually create indices CONCURRENTLY,
165+
2. deploy new release with migration scripts.
166+
167+
#### Manually create indices CONCURRENTLY
168+
Execute DDL statements produced by the [sample migration script](scripts/migratrion-0.5.0/partitioned/1-add-indices-manually.sql), adapt top level variables to match your journal configuration before executing.
169+
170+
#### Deploy new release with migration scripts
171+
See [sample flyway migration script](scripts/migratrion-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration.
172+
161173
## Contributing
162174
We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the [open issues](https://github.com/SwissBorg/akka-persistence-postgres/issues), or post your own suggestions!
163175

core/src/main/scala/akka/persistence/postgres/db/DbErrors.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ object DbErrors {
1212
import ExtendedPostgresProfile.api._
1313

1414
val PgDuplicateTable: String = "42P07"
15+
val PgUniqueViolation: String = "23505"
1516

1617
def withHandledPartitionErrors(logger: Logger, partitionDetails: String)(dbio: DBIOAction[_, NoStream, Effect])(
1718
implicit ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
@@ -26,4 +27,18 @@ object DbErrors {
2627
logger.debug(s"Created missing journal partition for $partitionDetails")
2728
DBIO.successful(())
2829
}
30+
31+
def withHandledIndexErrors(logger: Logger, indexDetails: String)(dbio: DBIOAction[_, NoStream, Effect])(implicit
32+
ec: ExecutionContext): DBIOAction[Unit, NoStream, Effect] =
33+
dbio.asTry.flatMap {
34+
case Failure(ex: SQLException) if ex.getSQLState == PgUniqueViolation =>
35+
logger.debug(s"Index $indexDetails already exists")
36+
DBIO.successful(())
37+
case Failure(ex) =>
38+
logger.error(s"Cannot create index $indexDetails", ex)
39+
DBIO.failed(ex)
40+
case Success(_) =>
41+
logger.debug(s"Created missing index $indexDetails")
42+
DBIO.successful(())
43+
}
2944
}

core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicReference
44

55
import akka.persistence.postgres.JournalRow
66
import akka.persistence.postgres.config.JournalConfig
7-
import akka.persistence.postgres.db.DbErrors.withHandledPartitionErrors
7+
import akka.persistence.postgres.db.DbErrors.{ withHandledIndexErrors, withHandledPartitionErrors }
88
import akka.serialization.Serialization
99
import akka.stream.Materializer
1010
import slick.jdbc.JdbcBackend.Database
@@ -69,9 +69,13 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ
6969
val name = s"${partitionPrefix}_$partitionNumber"
7070
val minRange = partitionNumber * partitionSize
7171
val maxRange = minRange + partitionSize
72-
withHandledPartitionErrors(logger, s"ordering between $minRange and $maxRange") {
73-
sqlu"""CREATE TABLE IF NOT EXISTS #${schema + name} PARTITION OF #${schema + journalTableCfg.tableName} FOR VALUES FROM (#$minRange) TO (#$maxRange)"""
74-
}
72+
val partitionName = s"${schema + name}"
73+
val indexName = s"${name}_persistence_sequence_idx"
74+
withHandledPartitionErrors(logger, s"$partitionName (ordering between $minRange and $maxRange)") {
75+
sqlu"""CREATE TABLE IF NOT EXISTS #$partitionName PARTITION OF #${schema + journalTableCfg.tableName} FOR VALUES FROM (#$minRange) TO (#$maxRange)"""
76+
}.andThen(withHandledIndexErrors(logger, s"$indexName for partition $partitionName") {
77+
sqlu"""CREATE UNIQUE INDEX IF NOT EXISTS #$indexName ON #$partitionName USING BTREE (#${journalTableCfg.columnNames.persistenceId}, #${journalTableCfg.columnNames.sequenceNumber});"""
78+
})
7579
}
7680
}
7781
DBIO

core/src/test/resources/schema/postgres/partitioned-schema.sql

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ CREATE SEQUENCE journal_ordering_seq OWNED BY public.journal.ordering;
4242

4343
CREATE EXTENSION IF NOT EXISTS intarray WITH SCHEMA public;
4444
CREATE INDEX journal_tags_idx ON public.journal USING GIN (tags public.gin__int_ops);
45-
CREATE INDEX journal_persistence_sequence_idx ON public.journal USING BTREE (persistence_id, sequence_number);
4645

4746
DROP TABLE IF EXISTS public.tags;
4847

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
DO $$
2+
DECLARE
3+
-- adapt those to match journal configuration
4+
v_journal_table_name constant text := 'journal';
5+
v_schema constant text := 'public';
6+
v_column_persistence_id text = 'persistence_id';
7+
v_column_sequence_number text = 'sequence_number';
8+
-- do not change values below
9+
v_persistence_seq_idx constant text := '_persistence_sequence_idx';
10+
v_rec record;
11+
v_sql text;
12+
BEGIN
13+
FOR v_rec IN
14+
-- get list of partitions
15+
SELECT
16+
child.relname AS child
17+
FROM
18+
pg_inherits
19+
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
20+
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
21+
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
22+
WHERE
23+
parent.relname = v_journal_table_name AND
24+
nmsp_parent.nspname = v_schema
25+
LOOP
26+
PERFORM
27+
FROM
28+
pg_indexes
29+
WHERE
30+
schemaname = v_schema
31+
AND tablename = v_rec.child
32+
AND indexname = v_rec.child || v_persistence_seq_idx;
33+
IF NOT FOUND THEN
34+
-- unique btree on (persistence_id, sequence_number)
35+
v_sql := 'CREATE UNIQUE INDEX CONCURRENTLY ' || quote_ident(v_rec.child || v_persistence_seq_idx) || ' ON ' || quote_ident(v_schema) || '.' || quote_ident(v_rec.child) || ' USING BTREE (' || quote_ident(v_column_persistence_id) || ',' || quote_ident(v_column_sequence_number) || ');';
36+
RAISE notice 'Run DDL: %', v_sql;
37+
END IF;
38+
39+
END LOOP;
40+
END;
41+
$$;
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
-- Ensure indexes exist on partitions, actual indexes are created manually before migration using CONCURRENTLY option.
2+
-- This block is needed to avoid missing indexes if there was a new partition created between manual index creation and
3+
-- actual migration. We cannot create indexes CONCURRENTLY here, as is not possible to create indexes CONCURRENTLY
4+
-- inside transaction and functions are executed inside transaction.
5+
DO $$
6+
DECLARE
7+
-- adapt those to match journal configuration
8+
v_journal_table_name constant text := 'journal';
9+
v_column_persistence_id text = 'persistence_id';
10+
v_column_sequence_number text = 'sequence_number';
11+
-- do not change values below
12+
v_persistence_seq_idx constant text := '_persistence_sequence_idx';
13+
-- detect why schema flyway uses
14+
v_schema constant text := (select trim(both '"' from split_part(setting,',',1)) FROM pg_settings WHERE name = 'search_path');
15+
v_rec record;
16+
v_sql text;
17+
BEGIN
18+
FOR v_rec IN
19+
-- get list of partitions
20+
SELECT
21+
child.relname AS child
22+
FROM
23+
pg_inherits
24+
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
25+
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
26+
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
27+
WHERE
28+
parent.relname = v_journal_table_name AND
29+
nmsp_parent.nspname = v_schema
30+
LOOP
31+
-- unique btree on (persistence_id, sequence_number)
32+
v_sql := 'CREATE UNIQUE INDEX IF NOT EXISTS ' || quote_ident(v_rec.child || v_persistence_seq_idx) || ' ON ' || quote_ident(v_schema) || '.' || quote_ident(v_rec.child) || ' USING BTREE (' || quote_ident(v_column_persistence_id) || ',' || quote_ident(v_column_sequence_number) || ');';
33+
RAISE notice 'Running DDL: %', v_sql;
34+
EXECUTE v_sql;
35+
36+
END LOOP;
37+
END;
38+
$$;
39+
40+
-- drop global, non-unique index
41+
DROP INDEX IF EXISTS journal_persistence_id_sequence_number_idx;

0 commit comments

Comments
 (0)