- 
                Notifications
    You must be signed in to change notification settings 
- Fork 19
feat: add support for add_column with backfill #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| ACTION NEEDED The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. For details on the error please inspect the "PR Title Check" action. | 
        
          
                ...base_2.12/src/main/java/com/lancedb/lance/spark/extention/ExtendedDataSourceV2Strategy.scala
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | NamedReference segmentId = Expressions.column(LanceConstant.ROW_ADDRESS); | ||
| SortValue sortValue = | ||
| new SortValue(segmentId, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST); | ||
| return new SortValue[] {sortValue}; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should both of two keys, _frag_id and _rowaddr, be sorted here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan is:
== Physical Plan ==
CommandResult <empty>
   +- AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1782/1752894940@6f7b8ae1, com.lancedb.lance.spark.write.AddColumnsWrite@6c8d8b60
      +- AdaptiveSparkPlan isFinalPlan=true
         +- == Final Plan ==
            *(2) Sort [_rowaddr#11L ASC NULLS FIRST], false, 0
            +- AQEShuffleRead coalesced
               +- ShuffleQueryStage 0
                  +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=84]
                     +- *(1) Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39]
                        +- *(1) ColumnarToRow
                           +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []
         +- == Initial Plan ==
            Sort [_rowaddr#11L ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=68]
               +- Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39]
                  +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []The sort operator is after distribution. So I think we can only sort data by _rowaddr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say the data is first repartitioned by _fragid, which guarantees that all rows from the same fragment end up in the same partition.
But if the sort doesn’t include _fragid, rows inside that partition may get interleaved? E.g., (1,0), (2,0), (1,1), (2,2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if the sort doesn’t include _fragid, rows inside that partition may get interleaved? E.g., (1,0), (2,0), (1,1), (2,2)
_rowaddr is u64 and is composed by:
 frag_id (32) + row_index(32)So I think rows will not get interleaved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan is:
== Physical Plan == CommandResult <empty> +- AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1782/1752894940@6f7b8ae1, com.lancedb.lance.spark.write.AddColumnsWrite@6c8d8b60 +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Sort [_rowaddr#11L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 0 +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=84] +- *(1) Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39] +- *(1) ColumnarToRow +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: [] +- == Initial Plan == Sort [_rowaddr#11L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=68] +- Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39] +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []The sort operator is after distribution. So I think we can only sort data by _rowaddr.
If the view has 3 new columns, namely new_col1, new_col2, new_col3, and we only want to do backfill for new_col1 and new_col2, will new_col3 be excluded from shuffle stage?
| Dataset<Row> df2 = result.withColumn("new_col", functions.expr("id * 100")); | ||
|  | ||
| // Write back with backfill option | ||
| df2.write() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit hesitated about this user experience... we are clearly inventing something new here which is great, but overloading the overwrite mode feels wrong to me. So far the "norm" is that we can invent something in SQL, and then eventually dataframe operations will catch up. For example there was MERGE INTO SQL, and in Spark 4.0 now there is merge DataFrame operations. So to me having a SQL extensions feels "official" compared to using write overwrite that feels much more like a hack.
df2.createOrReplaceTempView("backfill_data")
spark.sql("ALTER TABLE ADD COLUMNS col1, col2 AS SELECT * FROM backfill_data")I guess that brings the complexity of needing to add the SQL extensions and related parsers, but we probably need that anyway for features like compaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing is that compared to this, there is technically also the add column using function experience, which would be harder to express in DataFrame, but we can invent SQL like:
spark.sql("ALTER TABLE ADD COLUMNS col1 AS my_udf(col2)")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, that custom DSL/SQL will deliver a better user experience. For now, changing the semantics of overwrite/mergeinto seems okay to me.
For the UDF scenario, if the computed column only depends on the current dataset (and is 1:1 row-aligned), the UDF approach is more intuitive. It's worth noting that there are also cases where we need to combine external data to compute new columns — e.g., join with external tables / lookup services -- where the DataFrame style offers greater expressiveness.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I greatly appreciate your suggestion. I also think using SQL extensions feels more official.
| } | ||
|  | ||
| @Override | ||
| public SortOrder[] requiredOrdering() { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the row address naturally be ascending within a fragment? I don't think we need to force ordering it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the row address naturally be ascending within a fragment? I don't think we need to force ordering it?
The data in the original table may be processed in a distributed manner, which can change the order of the records. So it is necessary to force order it by _rowaddr.
555bd7f    to
    33a412c      
    Compare
  
    f2d0827    to
    fb84266      
    Compare
  
    | @jackye1995 @qidian99 This PR is ready. Could you please review it? Thank you. | 
9553e73    to
    d65a315      
    Compare
  
    | .withColumn("new_col1", functions.expr("id * 100")) | ||
| .withColumn("new_col2", functions.expr("id * 2")); | ||
|  | ||
| df2.createOrReplaceTempView("tmp_view"); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create the temporary view using pure Spark SQL syntax—e.g., CREATE VIEW [view_id] AS [query]—to verify that users can leverage this functionality entirely through Spark SQL?
Additionally, it would be helpful to add another test case where some of the columns targeted for backfill already exist, to ensure the error handling behaves as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also maybe another case where the source view/table is not aligned with lance dataset. E.g., 10 records in lance but 9 records in the view, and expect error to be thrown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qidian99 Thanks for your suggestions.
- I add test case for pure Spark SQL and added columns already exist.
- If the newly added record does not match the original dataset, two scenarios must be considered:
- a) If the new column is nullable (e.g., String), the missing rows will contain null.
- b) If the new column is non-nullable (e.g., Int32), an error will be raised.
| NamedReference segmentId = Expressions.column(LanceConstant.ROW_ADDRESS); | ||
| SortValue sortValue = | ||
| new SortValue(segmentId, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST); | ||
| return new SortValue[] {sortValue}; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The physical plan is:
== Physical Plan == CommandResult <empty> +- AppendData org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$1782/1752894940@6f7b8ae1, com.lancedb.lance.spark.write.AddColumnsWrite@6c8d8b60 +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(2) Sort [_rowaddr#11L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 0 +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=84] +- *(1) Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39] +- *(1) ColumnarToRow +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: [] +- == Initial Plan == Sort [_rowaddr#11L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(_fragid#12, 200), REPARTITION_BY_COL, [plan_id=68] +- Project [_rowaddr#11L, _fragid#12, id#8, (id#8 * 100) AS new_col#39] +- BatchScan add_column_table[id#8, _rowaddr#11L, _fragid#12] class com.lancedb.lance.spark.read.LanceScan RuntimeFilters: []The sort operator is after distribution. So I think we can only sort data by _rowaddr.
If the view has 3 new columns, namely new_col1, new_col2, new_col3, and we only want to do backfill for new_col1 and new_col2, will new_col3 be excluded from shuffle stage?
| import org.apache.spark.sql.connector.catalog._ | ||
| import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} | ||
|  | ||
| case class ExtendedDataSourceV2Strategy(session: SparkSession) extends SparkStrategy | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming to LanceDataSourceV2Strategy, or LanceSparkStrategy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the view has 3 new columns, namely new_col1, new_col2, new_col3, and we only want to do backfill for new_col1 and new_col2, will new_col3 be excluded from shuffle stage?
I made an optimization to AddColumnsBackfillExec: if some columns dont not need to be added, a new Project is introduced so that only the columns being added are shuffled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming to LanceDataSourceV2Strategy, or LanceSparkStrategy?
Good suggestion. I renamed it to LanceDataSourceV2Strategy
31d8010    to
    97cd432      
    Compare
  
    97cd432    to
    b089dde      
    Compare
  
    | Looks like we are making a lot of progress, let me know whenever this is ready for another pass! | 
| 
 @jackye1995 I have test this feature in a spark cluster. I think this PR is ready. Could you please review it again? Thanks a lot. | 
Related issue: #32
To add column with backfill data, some config should be set.