Skip to content

Add postgres cdc guide to doc #3557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
---
sidebar_label: 'Deduplication strategies'
description: 'Handle duplicates and deleted rows.'
slug: /integrations/clickpipes/postgres/deduplication
title: 'Deduplication strategies (using CDC)'
---

import clickpipes_initial_load from '@site/static/images/integrations/data-ingestion/clickpipes/postgres/postgres-cdc-initial-load.png';
import Image from '@theme/IdealImage';

Updates and deletes replicated from Postgres to ClickHouse result in duplicated rows in ClickHouse due to its data storage structure and the replication process. This page covers why this happens and the strategies to use in ClickHouse to handle duplicates.

## How does data get replicated? {#how-does-data-get-replicated}

### PostgreSQL logical decoding {#PostgreSQL-logical-decoding}

ClickPipes uses [Postgres Logical Decoding](https://www.pgedge.com/blog/logical-replication-evolution-in-chronological-order-clustering-solution-built-around-logical-replication) to consume changes as they happen in Postgres. The Logical Decoding process in Postgres enables clients like ClickPipes to receive changes in a human-readable format, i.e., a series of INSERTs, UPDATEs, and DELETEs.

### ReplacingMergeTree {#replacingmergetree}

ClickPipes maps Postgres tables to ClickHouse using the [ReplacingMergeTree](/engines/table-engines/mergetree-family/replacingmergetree) engine. ClickHouse performs best with append-only workloads and does not recommend frequent UPDATEs. This is where ReplacingMergeTree is particularly powerful.

With ReplacingMergeTree, updates are modeled as inserts with a newer version (`_peerdb_version`) of the row, while deletes are inserts with a newer version and `_peerdb_is_deleted` marked as true. The ReplacingMergeTree engine deduplicates/merges data in the background, and retains the latest version of the row for a given primary key (id), enabling efficient handling of UPDATEs and DELETEs as versioned inserts.

Below is an example of a CREATE Table statement executed by ClickPipes to create the table in ClickHouse.

```sql
CREATE TABLE users
(
`id` Int32,
`reputation` String,
`creationdate` DateTime64(6),
`displayname` String,
`lastaccessdate` DateTime64(6),
`aboutme` String,
`views` Int32,
`upvotes` Int32,
`downvotes` Int32,
`websiteurl` String,
`location` String,
`accountid` Int32,
`_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
`_peerdb_is_deleted` Int8,
`_peerdb_version` Int64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
PRIMARY KEY id
ORDER BY id;
```

### Illustrative example {#illustrative-example}

The illustration below walks through a basic example of synchronization of a table `users` between PostgreSQL and ClickHouse using ClickPipes.

<Image img={clickpipes_initial_load} alt="ClickPipes initial load" size="lg"/>

**Step 1** shows the initial snapshot of the 2 rows in PostgreSQL and ClickPipes performing the initial load of those 2 rows to ClickHouse. As you can observe, both rows are copied as-is to ClickHouse.

**Step 2** shows three operations on the users table: inserting a new row, updating an existing row, and deleting another row.

**Step 3** shows how ClickPipes replicates the INSERT, UPDATE, and DELETE operations to ClickHouse as versioned inserts. The UPDATE appears as a new version of the row with ID 2, while the DELETE appears as a new version of ID 1 which is marked as true using `_is_deleted`. Because of this, ClickHouse has three additional rows compared to PostgreSQL.

As a result, running a simple query like `SELECT count(*) FROM users;` may produce different results in ClickHouse and PostgreSQL. According to the [ClickHouse merge documentation](/merges#replacing-merges), outdated row versions are eventually discarded during the merge process. However, the timing of this merge is unpredictable, meaning queries in ClickHouse may return inconsistent results until it occurs.

How can we ensure identical query results in both ClickHouse and PostgreSQL?

### Deduplicate using FINAL Keyword {#deduplicate-using-final-keyword}

The recommended way to deduplicate data in ClickHouse queries is to use the [FINAL modifier.](/sql-reference/statements/select/from#final-modifier) This ensures only the deduplicated rows are returned.

Let's look at how to apply it to three different queries.

_Take note of the WHERE clause in the following queries, used to filter out deleted rows._

- **Simple count query**: Count the number of posts.

This is the simplest query you can run to check if the synchronization went fine. The two queries should return the same count.

```sql
-- PostgreSQL
SELECT count(*) FROM posts;

-- ClickHouse
SELECT count(*) FROM posts FINAL where _peerdb_is_deleted=0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SELECT count(*) 
FROM posts 
FINAL 
-- highlight-next-line
WHERE _peerdb_is_deleted=0;

This can be used to highlight within code blocks: https://docusaurus.io/docs/markdown-features/code-blocks#highlighting-with-comments

```

- **Simple aggregation with JOIN**: Top 10 users who have accumulated the most views.

An example of an aggregation on a single table. Having duplicates here would greatly affect the result of the sum function.

```sql
-- PostgreSQL
SELECT
sum(p.viewcount) AS viewcount,
p.owneruserid as user_id,
u.displayname as display_name
FROM posts p
LEFT JOIN users u ON u.id = p.owneruserid
-- highlight-next-line
WHERE p.owneruserid > 0
GROUP BY user_id, display_name
ORDER BY viewcount DESC
LIMIT 10;

-- ClickHouse
SELECT
sum(p.viewcount) AS viewcount,
p.owneruserid AS user_id,
u.displayname AS display_name
FROM posts AS p
FINAL
LEFT JOIN users AS u
FINAL ON (u.id = p.owneruserid) AND (u._peerdb_is_deleted = 0)
-- highlight-next-line
WHERE (p.owneruserid > 0) AND (p._peerdb_is_deleted = 0)
GROUP BY
user_id,
display_name
ORDER BY viewcount DESC
LIMIT 10
```

#### FINAL setting {#final-setting}

Rather than adding the FINAL modifier to each table name in the query, you can use the [FINAL setting](/operations/settings/settings#final) to apply it automatically to all tables in the query.

This setting can be applied either per query or for an entire session.

```sql
-- Per query FINAL setting
SELECT count(*) FROM posts SETTINGS final = 1;

-- Set FINAL for the session
SET final = 1;
SELECT count(*) FROM posts;
```

#### ROW policy {#row-policy}

An easy way to hide the redundant `_peerdb_is_deleted = 0` filter is to use [ROW policy.](/docs/operations/access-rights#row-policy-management) Below is an example that creates a row policy to exclude the deleted rows from all queries on the table votes.

```sql
-- Apply row policy to all users
CREATE ROW POLICY cdc_policy ON votes FOR SELECT USING _peerdb_is_deleted = 0 TO ALL;
```

> Row policies are applied to a list of users and roles. In this example, it is applied to all users and roles. This can be adjusted to only specific users or roles.

### Query like with Postgres {#query-like-with-postgres}

Migrating an analytical dataset from PostgreSQL to ClickHouse often requires modifying application queries to account for differences in data handling and query execution.

This section will explore techniques for deduplicating data while keeping the original queries unchanged.

#### Views {#views}

[Views](/sql-reference/statements/create/view#normal-view) are a great way to hide the FINAL keyword from the query, as they do not store any data and simply perform a read from another table on each access.

Below is an example of creating views for each table of our database in ClickHouse with the FINAL keyword and filter for the deleted rows.

```sql
CREATE VIEW posts_view AS SELECT * FROM posts FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW users_view AS SELECT * FROM users FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW votes_view AS SELECT * FROM votes FINAL WHERE _peerdb_is_deleted=0;
CREATE VIEW comments_view AS SELECT * FROM comments FINAL WHERE _peerdb_is_deleted=0;
```

Then, we can query the views using the same query we would use in PostgreSQL.

```sql
-- Most viewed posts
SELECT
sum(viewcount) AS viewcount,
owneruserid
FROM posts_view
WHERE owneruserid > 0
GROUP BY owneruserid
ORDER BY viewcount DESC
LIMIT 10
```

#### Refreshable Material view {#refreshable-material-view}

Another approach is to use a [Refreshable Materialized View](/materialized-view/refreshable-materialized-view), which enables you to schedule query execution for deduplicating rows and storing the results in a destination table. With each scheduled refresh, the destination table is replaced with the latest query results.

The key advantage of this method is that the query using the FINAL keyword runs only once during the refresh, eliminating the need for subsequent queries on the destination table to use FINAL.

However, a drawback is that the data in the destination table is only as up-to-date as the most recent refresh. That said, for many use cases, refresh intervals ranging from several minutes to a few hours may be sufficient.

```sql
-- Create deduplicated posts table
CREATE TABLE deduplicated_posts AS posts;

-- Create the Materialized view and schedule to run every hour
CREATE MATERIALIZED VIEW deduplicated_posts_mv REFRESH EVERY 1 HOUR TO deduplicated_posts AS
SELECT * FROM posts FINAL WHERE _peerdb_is_deleted=0
```

Then, you can query the table `deduplicated_posts` normally.

```sql
SELECT
sum(viewcount) AS viewcount,
owneruserid
FROM deduplicated_posts
WHERE owneruserid > 0
GROUP BY owneruserid
ORDER BY viewcount DESC
LIMIT 10;
```
4 changes: 2 additions & 2 deletions docs/integrations/data-ingestion/clickpipes/postgres/faq.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_label: 'ClickPipes for Postgres FAQ'
sidebar_label: 'FAQ'
description: 'Frequently asked questions about ClickPipes for Postgres.'
slug: /integrations/clickpipes/postgres/faq
sidebar_position: 2
Expand Down Expand Up @@ -40,7 +40,7 @@ Yes! ClickPipes for Postgres offers two ways to connect to databases in private
- Works across all regions

2. **AWS PrivateLink**
- Available in three AWS regions:
- Available in three AWS regions:
- us-east-1
- us-east-2
- eu-central-1
Expand Down
16 changes: 4 additions & 12 deletions docs/integrations/data-ingestion/clickpipes/postgres/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_label: 'ClickPipes for Postgres'
sidebar_label: 'Ingesting Data from Postgres to ClickHouse'
description: 'Seamlessly connect your Postgres to ClickHouse Cloud.'
slug: /integrations/clickpipes/postgres
title: 'Ingesting Data from Postgres to ClickHouse (using CDC)'
Expand Down Expand Up @@ -134,9 +134,7 @@ You can configure the Advanced settings if needed. A brief description of each s
7. You can select the tables you want to replicate from the source Postgres database. While selecting the tables, you can also choose to rename the tables in the destination ClickHouse database as well as exclude specific columns.

:::warning

If you are defining a Ordering Key in ClickHouse differently from the Primary Key in Postgres, please don't forget to read all the [considerations](https://docs.peerdb.io/mirror/ordering-key-different) around it!

If you are defining a Ordering Key in ClickHouse differently from the Primary Key in Postgres, please don’t forget to read all the [considerations](/integrations/clickpipes/postgres/ordering_keys) around it!
:::

### Review permissions and start the ClickPipe {#review-permissions-and-start-the-clickpipe}
Expand All @@ -147,12 +145,6 @@ You can configure the Advanced settings if needed. A brief description of each s

## What's next? {#whats-next}

Once you've moved data from Postgres to ClickHouse, the next obvious question is how to model your data in ClickHouse to make the most of it. Please refer to this page on [ClickHouse Data Modeling Tips for Postgres users](https://docs.peerdb.io/bestpractices/clickhouse_datamodeling) to help you model data in ClickHouse.
Once you've moved data from Postgres to ClickHouse, the next obvious question is how to query and model your data in ClickHouse to make the most of it. Please refer to the [migration guide](/migrations/postgres/overview) to a step by step approaches on how to migrate from PostgreSQL to ClickHouse. Alongside the migration guide, make sure to check the pages about [Deduplication strategies (using CDC)](/integrations/clickpipes/postgres/deduplication) and [Ordering Keys](/integrations/clickpipes/postgres/ordering_keys) to understand how to handle duplicates and customize ordering keys when using CDC.

Also, please refer to the [ClickPipes for Postgres FAQ](./postgres/faq) for more information about common issues and how to resolve them.

:::info

[This](https://docs.peerdb.io/bestpractices/clickhouse_datamodeling) is especially important as ClickHouse differs from Postgres, and you might encounter some surprises. This guide helps address potential pitfalls and ensures you can take full advantage of ClickHouse.

:::
Finally, please refer to the ["ClickPipes for Postgres FAQ"](/integrations/clickpipes/postgres/faq) page for more information about common issues and how to resolve them.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---
sidebar_label: 'Ordering keys'
description: 'How to define custom ordering keys.'
slug: /integrations/clickpipes/postgres/ordering_keys
title: 'Ordering keys'
---

Ordering Keys (a.k.a. sorting keys) define how data is sorted on disk and indexed for a table in ClickHouse. When replicating from Postgres, ClickPipes sets the Postgres primary key of a table as the ordering key for the corresponding table in ClickHouse. In most cases, the Postgres primary key serves as a sufficient ordering key, as ClickHouse is already optimized for fast scans, and custom ordering keys are often not required.

As describe in the [migration guide](/migrations/postgres/data-modeling-techniques), for larger use cases you should include additional columns beyond the Postgres primary key in the ClickHouse ordering key to optimize queries.

By default with CDC, choosing an ordering key different from the Postgres primary key can cause data deduplication issues in ClickHouse. This happens because the ordering key in ClickHouse serves a dual role: it controls data indexing and sorting while acting as the deduplication key. The easiest way to address this issue is by defining refreshable materialized views.

## Use Refreshable Materialized Views {#use-refreshable-materialized-views}

A simple way to define custom ordering keys (ORDER BY) is using [refreshable materialized views](/materialized-view/refreshable-materialized-view) (MVs). These allow you to periodically (e.g., every 5 or 10 minutes) copy the entire table with the desired ordering key.

Below is an example of a Refreshable MV with a custom ORDER BY and required deduplication:

```sql
CREATE MATERIALIZED VIEW posts_final
REFRESH EVERY 10 second ENGINE = ReplacingMergeTree(_peerdb_version)
ORDER BY (owneruserid,id) -- different ordering key but with suffixed postgres pkey
AS
SELECT * FROM posts FINAL
WHERE _peerdb_is_deleted = 0; -- this does the deduplication
```

## Custom ordering keys without refreshable materialized views {#custom-ordering-keys-without-refreshable-materialized-views}

If refreshable materialized views don't work due to the scale of data, here are a few recommendations you can follow to define custom ordering keys on larger tables and overcome deduplication-related issues.

### Choose ordering key columns that don't change for a given row {#choose-ordering-key-columns-that-dont-change-for-a-given-row}

When including additional columns in the ordering key for ClickHouse (besides the primary key from Postgres), we recommend selecting columns that don't change for each row. This helps prevent data consistency and deduplication issues with ReplacingMergeTree.

For example, in a multi-tenant SaaS application, using (`tenant_id`, `id`) as the ordering key is a good choice. These columns uniquely identify each row, and `tenant_id` remains constant for an `id` even if other columns change. Since deduplication by id aligns with deduplication by (tenant_id, id), it helps avoid data [deduplication issues](https://docs.peerdb.io/mirror/ordering-key-different) that could arise if tenant_id were to change.

### Set Replica Identity on Postgres tables to custom ordering key {#set-replica-identity-on-postgres-tables-to-custom-ordering-key}

For Postgres CDC to function as expected, it is important to modify the `REPLICA IDENTITY` on tables to include the ordering key columns. This is essential for handling DELETEs accurately.

If the `REPLICA IDENTITY` does not include the ordering key columns, Postgres CDC will not capture the values of columns other than the primary key - this is a limitation of Postgres logical decoding. All ordering key columns besides the primary key in Postgres will have nulls. This affects deduplication, meaning the previous version of the row may not be deduplicated with the latest deleted version (where `_peerdb_is_deleted` is set to 1).

In the above example with `owneruserid` and `id`, if the primary key does not already include `owneruserid`, you need to have a `UNIQUE INDEX` on (`owneruserid`, `id`) and set it as the `REPLICA IDENTITY` for the table. This ensures that Postgres CDC captures the necessary column values for accurate replication and deduplication.

Below is an example of how to do this on the events table. Make sure to apply this to all tables with modified ordering keys.

```sql
-- Create a UNIQUE INDEX on (owneruserid, id)
CREATE UNIQUE INDEX posts_unique_owneruserid_idx ON posts(owneruserid, id);
-- Set REPLICA IDENTITY to use this index
ALTER TABLE posts REPLICA IDENTITY USING INDEX posts_unique_owneruserid_idx;
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: 'ClickPipes for Postgres: Schema Changes Propagation Support'
title: 'Schema Changes Propagation Support'
slug: /integrations/clickpipes/postgres/schema-changes
description: 'Page describing schema change types detectable by ClickPipes in the source tables'
---
Expand All @@ -10,4 +10,4 @@ ClickPipes for Postgres can detect schema changes in the source tables. It can p
| ----------------------------------------------------------------------------------- | ------------------------------------- |
| Adding a new column (`ALTER TABLE ADD COLUMN ...`) | Propagated automatically, all rows after the change will have all columns filled |
| Adding a new column with a default value (`ALTER TABLE ADD COLUMN ... DEFAULT ...`) | Propagated automatically, all rows after the change will have all columns filled but existing rows will not show the DEFAULT value without a full table refresh |
| Dropping an existing column (`ALTER TABLE DROP COLUMN ...`) | Detected, but not propagated. All rows after the change will have NULL for the dropped columns |
| Dropping an existing column (`ALTER TABLE DROP COLUMN ...`) | Detected, but not propagated. All rows after the change will have NULL for the dropped columns |
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: 'ClickPipes for Postgres: Handling TOAST Columns'
title: 'Handling TOAST Columns'
description: 'Learn how to handle TOAST columns when replicating data from PostgreSQL to ClickHouse.'
slug: /integrations/clickpipes/postgres/toast
---
Expand Down
Loading