Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicated rows in Parquet files #163

Closed
2 tasks done
tomtaylor opened this issue Jan 15, 2024 · 24 comments · Fixed by #207
Closed
2 tasks done

Duplicated rows in Parquet files #163

tomtaylor opened this issue Jan 15, 2024 · 24 comments · Fixed by #207

Comments

@tomtaylor
Copy link

tomtaylor commented Jan 15, 2024

What happens?

We run a nightly process to dump some Postgres tables into Parquet files. Sometimes we see a handful of rows duplicated in the output. In last night's example, we saw 33 rows out of 5,396,400 with a duplicate copy. This might be unavoidable with PER_THREAD_OUTPUT enabled against a moving data set, but it might be worth documenting this.

Does it also mean some rows might be missing?

To Reproduce

I think this might be very difficult to reproduce, but our script looks something like this:

FORCE INSTALL postgres_scanner FROM 'http://nightly-extensions.duckdb.org'; 
INSTALL httpfs; 
LOAD httpfs;
SET s3_endpoint='storage.googleapis.com'; 
SET s3_access_key_id='id';
SET s3_secret_access_key='key';
ATTACH 'dbname=foo' AS pg (TYPE postgres);
USE pg;
BEGIN;
COPY (SELECT * FROM pg.table1) TO 's3://bucket/table1.parquet' (FORMAT 'parquet', CODEC 'ZSTD', PER_THREAD_OUTPUT true);
COPY (SELECT * FROM pg.table2) TO 's3://bucket/table2.parquet' (FORMAT 'parquet', CODEC 'ZSTD', PER_THREAD_OUTPUT true);
COMMIT;

Then:

SELECT id, filename, ROW_NUMBER() OVER (PARTITION BY id) FROM read_parquet('data_*.parquet', fil
┌──────────┬────────────────┬─────────────────────────────────────┐
│    id    │    filename    │ row_number() OVER (PARTITION BY id) │
│  int64   │    varchar     │                int64                │
├──────────┼────────────────┼─────────────────────────────────────┤
│ 60449480 │ data_4.parquet │                                   2 │
│ 60725890 │ data_4.parquet │                                   2 │
│ 61009724 │ data_4.parquet │                                   2 │
│ 60844642 │ data_0.parquet │                                   2 │
│ 53617707 │ data_4.parquet │                                   2 │
│ 60574594 │ data_4.parquet │                                   2 │
│ 56486342 │ data_4.parquet │                                   2 │
│ 60034575 │ data_4.parquet │                                   2 │
│ 60574565 │ data_0.parquet │                                   2 │
│ 60698777 │ data_3.parquet │                                   2 │
│ 61080027 │ data_4.parquet │                                   2 │
│ 60261247 │ data_4.parquet │                                   2 │
│ 61079630 │ data_0.parquet │                                   2 │
│ 60386713 │ data_4.parquet │                                   2 │
│ 60008204 │ data_2.parquet │                                   2 │
│ 60261152 │ data_4.parquet │                                   2 │
│ 60983239 │ data_4.parquet │                                   2 │
│ 61092457 │ data_3.parquet │                                   2 │
│ 60856837 │ data_1.parquet │                                   2 │
│ 59246489 │ data_0.parquet │                                   2 │
│ 60224537 │ data_3.parquet │                                   2 │
│ 60569503 │ data_4.parquet │                                   2 │
│ 60905359 │ data_0.parquet │                                   2 │
│ 60859433 │ data_4.parquet │                                   2 │
│ 60255325 │ data_4.parquet │                                   2 │
│ 60341075 │ data_0.parquet │                                   2 │
│ 60968139 │ data_0.parquet │                                   2 │
│ 60574631 │ data_0.parquet │                                   2 │
│ 60560326 │ data_2.parquet │                                   2 │
│ 60927674 │ data_3.parquet │                                   2 │
│ 61092552 │ data_4.parquet │                                   2 │
│ 60574652 │ data_0.parquet │                                   2 │
│ 61051974 │ data_1.parquet │                                   2 │
├──────────┴────────────────┴─────────────────────────────────────┤
│ 33 rows                                               3 columns │
└─────────────────────────────────────────────────────────────────┘

OS:

Linux

PostgreSQL Version:

14.7

DuckDB Version:

0.9.2

DuckDB Client:

CLI

Full Name:

Tom Taylor

Affiliation:

Breakroom

Have you tried this on the latest main branch?

  • I agree

Have you tried the steps to reproduce? Do they include all relevant data and configuration? Does the issue you report still appear there?

  • I agree
@Mytherin
Copy link
Contributor

Thanks for the report!

In principle all Postgres threads should run in the same transaction snapshot as explained here. However, not all Postgres servers support this and this feature is disabled for e.g. AWS Aurora. If you were to repeat the run with SET pg_debug_show_queries=true; you should see statements like SET TRANSACTION SNAPSHOT '00000004-0000B303-1'. If those are missing, you might be running on an unsupported version for transaction snapshots which could explain the duplicate rows. You could run the query SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver) which is the one DuckDB runs to obtain snapshot information - if that doesn't work then this might not be supported.

As a work-around you can then disable multi-threading (using e.g. SET threads=1 or SET pg_connection_limit=1).

@tomtaylor
Copy link
Author

Thanks - we're running on regular Postgres (hosted by Crunchybridge) and I can see those statements executing when I turn on the debug option.

SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver);
 pg_is_in_recovery | pg_export_snapshot  | count 
-------------------+---------------------+-------
 f                 | 00000058-01D28CC8-1 |     0
(1 row)

This job runs nightly and possibly overlaps with a full logical backup. Is there anything that could interact there?

@tomtaylor
Copy link
Author

tomtaylor commented Jan 17, 2024

Interestingly, I just took last night's dump which was a single threaded operation and outputted a single Parquet file for each table and found the same problem:

SELECT id, ROW_NUMBER() OVER (PARTITION BY id) FROM read_parquet('table.parquet') QUALIFY ROW_NUMBER() OVER (PARTITION BY id) > 1;
┌──────────┬─────────────────────────────────────┐
│    id    │ row_number() OVER (PARTITION BY id) │
│  int64   │                int64                │
├──────────┼─────────────────────────────────────┤
│ 611226062 │
│ 611501422 │
│ 610090862 │
│ 588308242 │
│ 609382412 │
│ 609445022 │
│ 611459212 │
│ 611817392 │
│ 609445262 │
│ 611310572 │
│ 609445302 │
│ 595973152 │
│ 611359992 │
│ 611506472 │
│ 611506862 │
│ 609445672 │
│ 611577022 │
│ 611817782 │
│ 609221762 │
│ 609444572 │
│ 609216902 │
│ 611292872 │
│ 611506912 │
│ 603701072 │
│ 611322472 │
│ 603669452 │
│ 609385732 │
│ 609445432 │
│ 592687772 │
│ 609385392 │
│ 609385452 │
│ 609233182 │
│ 609388082 │
│ 611303762 │
│ 609385692 │
│ 611817222 │
│ 611615592 │
│ 609444182 │
├──────────┴─────────────────────────────────────┤
│ 38 rows                              2 columns │
└────────────────────────────────────────────────┘

Performing a similar query against production produces zero results.

SELECT
  id,
  row_number
FROM (
  SELECT
    id,
    row_number() OVER (PARTITION BY id) AS row_number
  FROM
    table) t
WHERE
  row_number > 1

@Mytherin
Copy link
Contributor

Mytherin commented Jan 17, 2024

That's interesting. How many rows are in the table, and what kind of operations are running over the system in parallel (only insertions, or also updates/deletes)?

For sanity - could you perhaps try writing to a DuckDB table and checking if that produces the same problem to ensure this is not perhaps triggering an issue in the Parquet writer?

@tomtaylor
Copy link
Author

Sure thing - what's the best way to execute that to achieve the same effect? I know how to do

FORCE INSTALL postgres_scanner FROM 'http://nightly-extensions.duckdb.org'; 
ATTACH 'dbname=database' AS pg (TYPE postgres, READ_ONLY);
CREATE TABLE table AS SELECT * FROM pg.table;

But I'm not sure if that tests the same path?

@Mytherin
Copy link
Contributor

Yes that tests the same path from the Postgres' reader point of view, so that would be great. Perhaps try running with a single thread again to isolate as much as possible?

@tomtaylor
Copy link
Author

tomtaylor commented Jan 18, 2024

I just ran that end to end twice into a DuckDB database and dumped 5037494 rows, followed by 5044745 rows, with no duplicates on either run. To confirm, I downloaded last night's single threaded (by setting the PG connections to 1) Parquet file, and it contains 2 duplicate rows.

The data is moving constantly with inserts/deletes/updates. I estimate 20% of rows change on a daily basis. But it sounds like maybe the Parquet writer might be at fault here?

@Mytherin
Copy link
Contributor

That is possible. It could also be some connection between the Postgres scanner and the Parquet writer. Could you try dumping the contents of the DuckDB file to Parquet, and seeing if there are still no duplicate rows?

@tomtaylor
Copy link
Author

I had to SET preserve_insertion_order = false to prevent out of memory issues, but there are no duplicates in the exported Parquet file.

@tomtaylor
Copy link
Author

@Mytherin any more thoughts on this? 🙏

@Mytherin
Copy link
Contributor

Mytherin commented Feb 1, 2024

Could you try SET preserve_insertion_order = false when doing the copy to Parquet from Postgres directly and checking if there are still duplicates? SET preserve_insertion_order = false writes data in a substantially different manner - so it's possible that there's a problem in the insertion-order preserving Parquet write and that disabling that circumvented the bug.

@tomtaylor
Copy link
Author

These have been running with SET preserve_insertion_order = false for quite a while now, as we needed to do this to prevent hitting memory limits on the instance we run this job on.

@tomtaylor
Copy link
Author

I don't know if it's useful, but I'd be happy to send you one of the sample Parquet files privately.

@tomtaylor
Copy link
Author

Unfortunately this is still present in DuckDB 0.10.0.

@Mytherin
Copy link
Contributor

Could you send over one of the broken Parquet files ([email protected])?

@Mytherin
Copy link
Contributor

Another thing I can think of - could you perhaps export the result of postgres_query('SELECT * FROM tbl') to Parquet instead of querying the table using the Postgres scanner? There might be something funky going on with the ctids we are using (e.g. we might not be grabbing the correct locks to prevent rows from being moved around, perhaps causing duplicate rows to appear in that manner).

@noppaz
Copy link

noppaz commented Apr 8, 2024

I have this issue as well. From what I can tell it is only and issue with rows that were updated during the COPY process. Debugged the queries that are executed on Postgres from Duckdb and found this:

When multiple connections are used, the first connection will start a transaction:

  1. BEGIN TRANSACTION READ ONLY (defaults to READ COMMITTED)
  2. SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)
  3. Start COPYing tables with ctid predicates

All other connections use:

  1. BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY
  2. SET TRANSACTION SNAPSHOT 'some-snapshot-id'
  3. Start COPYing tables with ctid predicates

A read committed transaction can see updates from other transactions, so I think this is what is causing the issue. Furthermore, this will be a problem even though we limit threads, and also if we limit amount of postgres connections with SET pg_connection_limit=1 since this will then only use the first read committed transaction. If all the connections were using repeatable read isolation, I assume this wouldn't be a problem anymore, and multiple connections would use the same transaction snapshot.

@Mytherin
Copy link
Contributor

Mytherin commented Apr 8, 2024

That's interesting - thanks for investigating! Considering the problem also occurred in single-threaded operation the isolation level could definitely be a likely culprit. I've pushed a PR that switches the main transaction to use repeatable read here - #207. Perhaps you could give that a shot and see if it resolves the issue?

@noppaz
Copy link

noppaz commented Apr 8, 2024

Nice @Mytherin, its in my interest to get a potential fix out asap so happy to test it out. How would I easiest test out your PR? Or did you mean waiting for the nightly build?

@Mytherin
Copy link
Contributor

Mytherin commented Apr 8, 2024

You should be able to fetch the artifacts from the PR itself - see here https://github.com/duckdb/postgres_scanner/actions/runs/8601970047?pr=207 (scroll down to "Artifacts")

@noppaz
Copy link

noppaz commented Apr 8, 2024

Thanks a lot @Mytherin, I was able to load the unsigned extension and try this out.

I ran a single connection without any threading to get the most likelihood of changes to be applied during different copy statements with ctid ranges. And concurrently ran a script to update a bunch of rows in Postgres. Was able to verify that with the current extension in 0.10.1 duplicates from the different transactions are created, but with the new extension from the PR this was not the case.

Let me know if you need anything else here, I'd really like to help push this out asap.

@tomtaylor
Copy link
Author

Thanks so much for spotting the underlying issue here @noppaz!

@Mytherin
Copy link
Contributor

Mytherin commented Apr 8, 2024

Thanks - I’ve merged the PR. I can push out a new version of the extension tomorrow after the nightly builds.

@Mytherin
Copy link
Contributor

Mytherin commented Apr 9, 2024

I've published the nightly - FORCE INSTALL postgres_scanner should get you the new version in v0.10.1 (potentially pending some cloudflare caches being cleared)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants