Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] MOR table behavior for Spark Bulk insert to COW #12133

Open
geserdugarov opened this issue Oct 21, 2024 · 16 comments
Open

[SUPPORT] MOR table behavior for Spark Bulk insert to COW #12133

geserdugarov opened this issue Oct 21, 2024 · 16 comments
Labels
behavior-unexpected priority:critical production down; pipelines stalled; Need help asap. spark-sql writer-core Issues relating to core transactions/write actions

Comments

@geserdugarov
Copy link
Contributor

geserdugarov commented Oct 21, 2024

I've already created an issue HUDI-8394, but what to highlight and discuss this problem here.
I suppose, this is a critical issue with current master when:

  • bulk insert operation,
  • hoodie.datasource.write.row.writer.enable = false,
  • simple bucket index.

Describe the problem you faced

When I try to bulk insert to COW table, I see in file system parquet and log files, which is MOR table behavior.

I've checked that table is COW type.

cat ./.hoodie/hoodie.properties 
# ...
# hoodie.table.type=COPY_ON_WRITE       <-- COW table
# ...

But files are not for COW table:

ll ./dt\=2021-01-05/
# total 456
# drwxr-xr-x 2 d00838679 d00838679   4096 окт 19 15:33 ./
# drwxrwxr-x 4 d00838679 d00838679   4096 окт 19 15:32 ../
# -rw-r--r-- 1 d00838679 d00838679 435346 окт 19 15:32 00000001-4a79-47b3-918c-05f8b90e8b14-0_1-14-12_20241019083242289.parquet               <-- base file
# -rw-r--r-- 1 d00838679 d00838679   3412 окт 19 15:32 .00000001-4a79-47b3-918c-05f8b90e8b14-0_1-14-12_20241019083242289.parquet.crc
# -rw-r--r-- 1 d00838679 d00838679    978 окт 19 15:33 .00000001-4a79-47b3-918c-05f8b90e8b14-0_20241019083307134.log.1_0-30-31                <-- log file as for MOR table
# -rw-r--r-- 1 d00838679 d00838679     16 окт 19 15:33 ..00000001-4a79-47b3-918c-05f8b90e8b14-0_20241019083307134.log.1_0-30-31.crc
# -rw-r--r-- 1 d00838679 d00838679     96 окт 19 15:32 .hoodie_partition_metadata
# -rw-r--r-- 1 d00838679 d00838679     12 окт 19 15:32 ..hoodie_partition_metadata.crc 

To Reproduce

To reproduce, existed test Test Bulk Insert Into Bucket Index Table could be modified and used:

test("Test Bulk Insert Into Bucket Index Table") {
  withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert", "hoodie.bulkinsert.shuffle.parallelism" -> "1") {
    withTempDir { tmp =>
      val tableName = generateTableName
      // Create a partitioned table
      spark.sql(
        s"""
            |create table $tableName (
            |  id int,
            |  dt string,
            |  name string,
            |  price double,
            |  ts long
            |) using hudi
            | tblproperties (
            | primaryKey = 'id,name',
            | type = 'cow',
            | preCombineField = 'ts',
            | hoodie.index.type = 'BUCKET',
            | hoodie.index.bucket.engine = 'SIMPLE',
            | hoodie.bucket.index.num.buckets = '2',
            | hoodie.bucket.index.hash.field = 'id,name',
            | hoodie.datasource.write.row.writer.enable = 'false')
            | partitioned by (dt)
            | location '${tmp.getCanonicalPath}'
            """.stripMargin)
      spark.sql(
        s"""
            | insert into $tableName values
            | (5, 'a1,1', 10, 1000, "2021-01-05")
            """.stripMargin)
      spark.sql(
        s"""
            | insert into $tableName values
            | (9, 'a3,3', 30, 3000, "2021-01-05")
         """.stripMargin)
      )
    }
  }
}

Expected behavior

For COW table, only parquet files should be created.

Environment Description

  • Hudi version : current master

  • Spark version : 3.4

@geserdugarov
Copy link
Contributor Author

Currently, for this case BucketIndexBulkInsertPartitioner is used:

public Option<WriteHandleFactory> getWriteHandleFactory(int idx) {
return doAppend.get(idx) ? Option.of(new AppendHandleFactory()) :
Option.of(new SingleFileHandleCreateFactory(FSUtils.createNewFileId(getFileIdPfx(idx), 0), this.preserveHoodieMetadata));
}

First insert uses SingleFileHandleCreateFactory, but the second insert will use AppendHandleFactory, and create log file.

I don't understand how Bulk insert to COW table with Simple bucket index should work by design. When we inserting data, that should update previous data, should we create new parquet file with new data, and call inline compaction (due to COW table type), or merge and write data to new parquet file, then it's not bulk insert?

@geserdugarov geserdugarov changed the title [SUPPORT] MOR table behavior for Spark bulk insert to COW [SUPPORT] MOR table behavior for Spark Bulk insert to COW Oct 21, 2024
@danny0405
Copy link
Contributor

Bulk_insert should only be executed once IMO, for second update, you should use upsert operation instead.

@ad1happy2go
Copy link
Collaborator

@geserdugarov Thats surprising. We should ideally never have log files regardless of what operation you use.

@ad1happy2go ad1happy2go added writer-core Issues relating to core transactions/write actions spark-sql behavior-unexpected labels Oct 23, 2024
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Oct 23, 2024
@ad1happy2go
Copy link
Collaborator

@geserdugarov I exactly tried your code with current master and it is working as expected. I dont see any log files.
image

Can you please check once. Thanks.

@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 👤 User Action in Hudi Issue Support Oct 23, 2024
@geserdugarov
Copy link
Contributor Author

geserdugarov commented Oct 23, 2024

@ad1happy2go, sorry, I wrote wrong version of Spark first.
When I tried to build Hudi with -Dspark3.5, I faced

An exception or error caused a run to abort: org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement.copy$default$7()Z 
java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.InsertIntoStatement.copy$default$7()Z
	at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.rebaseInsertIntoStatement(HoodieSpark3CatalystPlanUtils.scala:70)
	at org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.rebaseInsertIntoStatement$(HoodieSpark3CatalystPlanUtils.scala:69)
	at org.apache.spark.sql.HoodieSpark34CatalystPlanUtils$.rebaseInsertIntoStatement(HoodieSpark34CatalystPlanUtils.scala:33)
	at org.apache.spark.sql.hudi.analysis.HoodieAnalysis$AdaptIngestionTargetLogicalRelations$$anonfun$$nestedInanonfun$apply$1$1.applyOrElse(HoodieAnalysis.scala:314)
...

As you can see, there is calling of HoodieSpark34CatalystPlanUtils in the stack trace. So, I didn't check the reason, and switched to -Dspark3.4 to skip it.

I'm using Ubuntu 22.04, and run modified test in IntelliJ IDEA. To prevent removing of data files, I placed breakpoint right after the second insert, and check filesystem in debug mode.

Rechecked mentioned test again for commit 66597e5, and still see:

/tmp/spark-bee1984c-74df-4067-8e14-09a61e02e0c8$ tree -a
.
├── dt=2021-01-05
│   ├── 00000001-9e90-410e-bdf3-4ea189ba93ac-0_1-14-12_20241023121839934.parquet
│   ├── .00000001-9e90-410e-bdf3-4ea189ba93ac-0_1-14-12_20241023121839934.parquet.crc
│   ├── .00000001-9e90-410e-bdf3-4ea189ba93ac-0_20241023121845855.log.1_0-30-31
│   ├── ..00000001-9e90-410e-bdf3-4ea189ba93ac-0_20241023121845855.log.1_0-30-31.crc
│   ├── .hoodie_partition_metadata
│   └── ..hoodie_partition_metadata.crc

@geserdugarov
Copy link
Contributor Author

geserdugarov commented Oct 23, 2024

I've simplified provided test to:

test("Test MOR as COW") {
  withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
    spark.sql(
      s"""
         |create table mor_as_cow (
         |  id int,
         |  dt int
         |) using hudi
         | tblproperties (
         | primaryKey = 'id',
         | type = 'cow',
         | preCombineField = 'dt',
         | hoodie.index.type = 'BUCKET',
         | hoodie.index.bucket.engine = 'SIMPLE',
         | hoodie.bucket.index.num.buckets = '2',
         | hoodie.datasource.write.row.writer.enable = 'false')
         | location '/tmp/mor_as_cow'
        """.stripMargin)
    spark.sql(s"insert into mor_as_cow values (5, 10)")
    spark.sql(s"insert into mor_as_cow values (9, 30)")
  }
}

In a result:

tree -a /tmp/mor_as_cow/
./mor_as_cow/
├── 00000000-5b7e-4294-a60d-686ea422d0cc-0_0-14-12_20241023123912660.parquet
├── .00000000-5b7e-4294-a60d-686ea422d0cc-0_0-14-12_20241023123912660.parquet.crc
├── .00000000-5b7e-4294-a60d-686ea422d0cc-0_20241023123917968.log.1_0-30-31
├── ..00000000-5b7e-4294-a60d-686ea422d0cc-0_20241023123917968.log.1_0-30-31.crc

@geserdugarov
Copy link
Contributor Author

geserdugarov commented Oct 25, 2024

@ad1happy2go I've prepared local Spark 3.5.3 cluster and reproduced this bug using PySpark. The script is available here:
https://github.com/geserdugarov/test-hudi-issues/blob/main/HUDI-8394/write-COW-get-MOR.py
To build Hudi Spark bundle I used commit a7512a2.

After

INSERT INTO cow_or_mor VALUES (5, 10);
INSERT INTO cow_or_mor VALUES (9, 30);

for

SELECT * FROM cow_or_mor;

I got:

('5', '', '00000000-dad4-4358-aaad-767a76e43e70-0_0-14-12_20241025153558259.parquet', 5, 10)

We see only one row, and missed the second one with id=9, because it's placed in a log file, despite the fact that we set COW table:

tree -a /tmp/write-COW-get-MOR
# .
# ├── 00000000-dad4-4358-aaad-767a76e43e70-0_0-14-12_20241025153558259.parquet
# ├── .00000000-dad4-4358-aaad-767a76e43e70-0_0-14-12_20241025153558259.parquet.crc
# ├── .00000000-dad4-4358-aaad-767a76e43e70-0_20241025153606721.log.1_0-30-28
# ├── ..00000000-dad4-4358-aaad-767a76e43e70-0_20241025153606721.log.1_0-30-28.crc

@ad1happy2go
Copy link
Collaborator

@geserdugarov
Did you checked hoodie.properties inside write-COW-get-MOR ?
DId you got a chance to try with latest master?

@ad1happy2go
Copy link
Collaborator

@geserdugarov I tried your code with the current master but can't see any issue. Can you please try once and confirm please. Thanks a lot.

Code

create table mor_as_cow (
           id int,
           dt int
         ) using hudi
          tblproperties (
          primaryKey = 'id',
          type = 'cow',
          preCombineField = 'dt',
          hoodie.index.type = 'BUCKET',
          hoodie.index.bucket.engine = 'SIMPLE',
          hoodie.bucket.index.num.buckets = '2',
          hoodie.datasource.write.row.writer.enable = 'false',
          hoodie.datasource.write.operation = 'bulk_insert')
          location '/tmp/mot_as_cow';

insert into mor_as_cow values (5, 10);

insert into mor_as_cow values (9, 30);
image

@ad1happy2go ad1happy2go added the priority:critical production down; pipelines stalled; Need help asap. label Nov 6, 2024
@ad1happy2go
Copy link
Collaborator

@geserdugarov I also tried building the code using the commit you mentioned, 66597e5 but still see the expected correct behavior.

@geserdugarov
Copy link
Contributor Author

geserdugarov commented Nov 6, 2024

@ad1happy2go , I will recheck on current master.
But I see, that you set
hoodie.datasource.write.operation = 'bulk_insert'
in table properties. But this config shouldn't be stored in table properties, and related only to writer. You can set current writer property by
SET hoodie.datasource.write.operation=bulk_insert

@geserdugarov
Copy link
Contributor Author

geserdugarov commented Nov 6, 2024

Issue is reproduced on 3d81ea0:

tree -a
.
├── 00000000-6344-45e3-abbb-b78fd08cfc12-0_0-13-11_20241106134327326.parquet
├── .00000000-6344-45e3-abbb-b78fd08cfc12-0_0-13-11_20241106134327326.parquet.crc
├── .00000000-6344-45e3-abbb-b78fd08cfc12-0_20241106134336247.log.1_0-29-27
├── ..00000000-6344-45e3-abbb-b78fd08cfc12-0_20241106134336247.log.1_0-29-27.crc
├── .hoodie

Hoodie table properties:

cat ./.hoodie/hoodie.properties
#Updated at 2024-11-06T06:43:32.140Z
#Wed Nov 06 13:43:32 NOVT 2024
hoodie.table.precombine.field=dt
hoodie.table.version=8
hoodie.database.name=default
hoodie.table.initial.version=8
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=2880991016
hoodie.table.keygenerator.type=NON_PARTITION
hoodie.table.create.schema={"type"\:"record","name"\:"cow_or_mor_record","namespace"\:"hoodie.cow_or_mor","fields"\:[{"name"\:"id","type"\:["int","null"]},{"name"\:"dt","type"\:["int","null"]}]}
hoodie.archivelog.folder=archived
hoodie.table.name=cow_or_mor
hoodie.record.merge.strategy.id=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
hoodie.compaction.payload.class=org.apache.hudi.common.model.DefaultHoodieRecordPayload
hoodie.table.type=COPY_ON_WRITE                                              <-- COW
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=files
hoodie.timeline.layout.version=1
hoodie.record.merge.mode=EVENT_TIME_ORDERING
hoodie.table.recordkey.fields=id

Code:

spark.sql("CREATE TABLE cow_or_mor ("
          "  id int,"
          "  dt int"
          ") USING HUDI "
          "TBLPROPERTIES ("
          "  'primaryKey' = 'id',"
          "  'type' = 'cow',"
          "  'preCombineField' = 'dt',"
          "  'hoodie.index.type' = 'BUCKET',"
          "  'hoodie.index.bucket.engine' = 'SIMPLE',"
          "  'hoodie.bucket.index.num.buckets' = '2',"
          "  'hoodie.datasource.write.row.writer.enable' = 'false'"
          ") LOCATION '" + tmp_dir_path + "';")

spark.sql("SET hoodie.datasource.write.operation=bulk_insert")
spark.sql("INSERT INTO cow_or_mor VALUES (5, 10);")
spark.sql("INSERT INTO cow_or_mor VALUES (9, 30);")

@ad1happy2go
Copy link
Collaborator

ad1happy2go commented Nov 6, 2024

@geserdugarov Thanks a lot. Thats a good catch. I am able to reproduce this issue with above code. We will work on this.

@ad1happy2go
Copy link
Collaborator

@geserdugarov I have also confirmed the same behaviour with Hudi 0.15.X and spark master. It only happens for bucket index, so thinking if this is expected.
@danny0405 Are you aware of this by any chance?

@danny0405
Copy link
Contributor

bulk_insert is designed for executed with bootstrap purpose, because the whole pipeline just ignore any updates and do a one-shot write of parquets. If you do another bulk_insert based on the first one, it will be seen as an update and trigger log file write.

@geserdugarov
Copy link
Contributor Author

Bulk_insert should only be executed once IMO, for second update, you should use upsert operation instead.

Then using this logic, I propose fix as #12245. Need to wait CI results to be sure that it doesn't brake other specific cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
behavior-unexpected priority:critical production down; pipelines stalled; Need help asap. spark-sql writer-core Issues relating to core transactions/write actions
Projects
Status: 👤 User Action
Development

No branches or pull requests

3 participants