-
Notifications
You must be signed in to change notification settings - Fork 418
fix: Use binary(16) for UUID type to ensure Spark compatibility #2881
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
PyArrow's pa.uuid() type creates Python Arrow metadata that differs from Java Arrow's UUID metadata, causing incompatibility with Spark. Python and Rust Arrow implementations don't recognize Java's UUID metadata. Changes: - Change UUIDType Arrow schema conversion from pa.uuid() to pa.binary(16) - Add integration test verifying UUID round-trip between PyIceberg and Spark - Update existing tests to expect binary(16) instead of pa.uuid() - Fix test_uuid_partitioning to properly convert bytes to UUID strings - Bump Iceberg version to 1.10.1 which includes Java-side UUID fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be a revert of #2007
I think this is a new issue with Spark, Iceberg, and UUID. The previous fix (apache/iceberg#13324) and its spark 4 backport (apache/iceberg#13573) is already included in 1.10.0
I've include the stacktrace from spark for debugging
EDIT:
The Java-side UUID fix in 1.10.1 is actually apache/iceberg#14027
I had to run make test-integration-rebuild to update the docker image cache
the new stacktrace is
> result = table.scan(row_filter=EqualTo("uuid_col", uuid.UUID("00000000-0000-0000-0000-000000000000").bytes)).to_arrow()
tests/integration/test_writes/test_writes.py:2588:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pyiceberg/table/__init__.py:2027: in to_arrow
).to_table(self.plan_files())
pyiceberg/io/pyarrow.py:1730: in to_table
first_batch = next(batches)
pyiceberg/io/pyarrow.py:1781: in to_record_batches
for batches in executor.map(batches_for_task, tasks):
../../.pyenv/versions/3.12.11/lib/python3.12/concurrent/futures/_base.py:619: in result_iterator
yield _result_or_cancel(fs.pop())
../../.pyenv/versions/3.12.11/lib/python3.12/concurrent/futures/_base.py:317: in _result_or_cancel
return fut.result(timeout)
../../.pyenv/versions/3.12.11/lib/python3.12/concurrent/futures/_base.py:456: in result
return self.__get_result()
../../.pyenv/versions/3.12.11/lib/python3.12/concurrent/futures/_base.py:401: in __get_result
raise self._exception
../../.pyenv/versions/3.12.11/lib/python3.12/concurrent/futures/thread.py:59: in run
result = self.fn(*self.args, **self.kwargs)
pyiceberg/io/pyarrow.py:1778: in batches_for_task
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
pyiceberg/io/pyarrow.py:1818: in _record_batches_from_scan_tasks_and_deletes
for batch in batches:
pyiceberg/io/pyarrow.py:1600: in _task_to_record_batches
fragment_scanner = ds.Scanner.from_fragment(
pyarrow/_dataset.pyx:3792: in pyarrow._dataset.Scanner.from_fragment
???
pyarrow/_dataset.pyx:3547: in pyarrow._dataset._populate_builder
???
pyarrow/_compute.pyx:2884: in pyarrow._compute._bind
???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> ???
E pyarrow.lib.ArrowNotImplementedError: Function 'equal' has no kernel matching input types (extension<arrow.uuid>, extension<arrow.uuid>)
pyarrow/error.pxi:92: ArrowNotImplementedError
|
|
||
| def visit_uuid(self, _: UUIDType) -> pa.DataType: | ||
| return pa.uuid() | ||
| return pa.binary(16) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this is the right fix.
We explicity changed binary(16) to uuid in this PR
https://github.com/apache/iceberg-python/pull/2007/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR687
The current change reverts it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that my other comment clarifies this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which comment? I would prefer to keep it UUID and fix this on the Java side.
Python and Rust Arrow implementations don't recognize Java's UUID metadata.
Most implementations don't really look at the Arrow/Parquet/etc logical annotations, so both uuid (which is a fixed[16] with an UUID label on it) and cast it to a type that's compatible with the query engine. Spark has shown to be problematic because it doesn't have a native UUID type, but it handles it internally as a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was referring to this comment #2881 (comment), which points to the #2372 issue where I explain the problem. In this issue your suggestion was to change to fixed[16].
@kevinjqliu added the error we're getting now in the other comment. The problem is filtering a parquet file with UUID, not reading it.
E pyarrow.lib.ArrowNotImplementedError: Function 'equal' has no kernel matching input types (extension<arrow.uuid>, extension<arrow.uuid>)
pyarrow/error.pxi:92: ArrowNotImplementedError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the PR body to add the filtering error.
| assert len(table.scan().to_arrow()) == 4 | ||
|
|
||
| result = df.where("uuid_col = '00000000-0000-0000-0000-000000000000'") | ||
| assert result.count() == 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this test on current main branch with 1.10.1 and this is the stacktrace. This is different from the stacktrace in #2007
E pyspark.errors.exceptions.connect.SparkException: Job aborted due to stage failure: Task 0 in stage 142.0 failed 1 times, most recent failure: Lost task 0.0 in stage 142.0 (TID 287) (fcaa97ba83c2 executor driver): java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.nio.ByteBuffer (java.util.UUID and java.nio.ByteBuffer are in module java.base of loader 'bootstrap')
E at java.base/java.nio.ByteBuffer.compareTo(ByteBuffer.java:267)
E at java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
E at java.base/java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
E at org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:306)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
E at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:162)
E at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:390)
E at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:409)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
E at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
E at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
E at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
E at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:126)
E at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
E at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
E at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
E at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
E at scala.Option.exists(Option.scala:406)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
E at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
E at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
E at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
E at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
E at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
E at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
E at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
E at org.apache.spark.scheduler.Task.run(Task.scala:147)
E at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
E at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
E at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
E at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
E at java.base/java.lang.Thread.run(Thread.java:840)
E
E Driver stacktrace:
E
E JVM stacktrace:
E org.apache.spark.SparkException
E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
E at scala.Option.getOrElse(Option.scala:201)
E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
E at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
E at scala.collection.immutable.List.foreach(List.scala:334)
E at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
E at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
E at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
E at scala.Option.foreach(Option.scala:437)
E at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
E at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
E at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
E at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
E at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
E at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:201)
E at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:260)
E at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:257)
E at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:197)
E at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.$anonfun$processAsArrowBatches$2(SparkConnectPlanExecution.scala:155)
E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:163)
E at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:125)
E at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
E at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
E at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
E at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
E at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:125)
E at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:295)
E at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:124)
E at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:78)
E at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:237)
E at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.processAsArrowBatches(SparkConnectPlanExecution.scala:154)
E at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:78)
E at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:314)
E at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
E at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
E at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
E at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
E at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
E at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
E at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
E at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
E at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
E at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
E at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
E at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
E at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
E at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)
E Caused by: java.lang.ClassCastException: class java.util.UUID cannot be cast to class java.nio.ByteBuffer (java.util.UUID and java.nio.ByteBuffer are in module java.base of loader 'bootstrap')
E at java.nio.ByteBuffer.compareTo(ByteBuffer.java:267)
E at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:52)
E at java.util.Comparators$NaturalOrderComparator.compare(Comparators.java:47)
E at org.apache.iceberg.types.Comparators$NullSafeChainedComparator.compare(Comparators.java:306)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:352)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eq(ParquetMetricsRowGroupFilter.java:79)
E at org.apache.iceberg.expressions.ExpressionVisitors$BoundExpressionVisitor.predicate(ExpressionVisitors.java:162)
E at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:390)
E at org.apache.iceberg.expressions.ExpressionVisitors.visitEvaluator(ExpressionVisitors.java:409)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter$MetricsEvalVisitor.eval(ParquetMetricsRowGroupFilter.java:103)
E at org.apache.iceberg.parquet.ParquetMetricsRowGroupFilter.shouldRead(ParquetMetricsRowGroupFilter.java:73)
E at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:108)
E at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
E at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
E at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:126)
E at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
E at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
E at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
E at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
E at scala.Option.exists(Option.scala:406)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
E at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
E at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
E at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(:-1)
E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(:-1)
E at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(:-1)
E at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
E at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
E at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
E at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
E at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
E at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
E at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
E at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
E at org.apache.spark.scheduler.Task.run(Task.scala:147)
E at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
E at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
E at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
E at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
E at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
E at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
E at java.lang.Thread.run(Thread.java:840)
.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py:1882: SparkException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i also downloaded the 2 data files
➜ Downloads parquet schema 00000-0-562a1d32-f5da-4e09-836a-b7d0d4e737e6.parquet
{
"type" : "record",
"name" : "schema",
"fields" : [ {
"name" : "uuid_col",
"type" : [ "null", {
"type" : "string",
"logicalType" : "uuid"
} ],
"default" : null
} ]
}
➜ Downloads parquet schema 00000-284-30c509c4-f8e9-46fb-a1f1-169e1c928e00-0-00001.parquet
{
"type" : "record",
"name" : "table",
"fields" : [ {
"name" : "uuid_col",
"type" : {
"type" : "string",
"logicalType" : "uuid"
}
} ]
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The stacktrace has changed because of the fix that I made in the Java implementation. This PR (apache/iceberg#14027) has more details about the problem and in the issue #2372 I explain the problem from the pyiceberg side and why we are changing back to binary(16).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Parquet files look wrong, and hot sure what happened there. UUID should annotate FILED_LEN_BYTE_ARRAY:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pa.binary(16) change fixed the comparison issue but broke Parquet spec compliance by removing the UUID logical type annotation. We can get back to UUID in the visitor and raise an exception with a better message when the user tries to filter a UUID column, since PyArrow does not support filtering.
😄 |
Closes #2372
Rationale for this change
PyArrow's pa.uuid() type creates Python Arrow metadata that differs from Java Arrow's UUID metadata, causing incompatibility with Spark. Python and Rust Arrow implementations don't recognize Java's UUID metadata for filtering. Reading works, but filtering returns this error:
Are these changes tested?
Yes
Are there any user-facing changes?
No