From cfe9b8eca3f2f0ef5ad3ffd4de6be6c47b8dd3fb Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Sun, 28 Dec 2025 14:34:34 +0530 Subject: [PATCH] Fix CometShuffleManager hang by deferring SparkEnv access --- .github/workflows/spark_sql_test.yml | 4 -- .../shuffle/CometShuffleManager.scala | 45 +++++++++++++------ 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 3d7aa2e2f9..d143ef83a0 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -59,10 +59,6 @@ jobs: - {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} - {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"} - {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"} - # Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946 - exclude: - - spark-version: {short: '4.0', full: '4.0.1', java: 17} - module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"} fail-fast: false name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }} runs-on: ${{ matrix.os }} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala index 927e309325..aa47dfa166 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala @@ -58,7 +58,37 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { */ private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]() - private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf) + // Lazy initialization to avoid accessing SparkEnv.get during ShuffleManager construction, + // which can cause hangs when SparkEnv is not fully initialized (e.g., during Hive metastore ops) + // This is only initialized when getWriter/getReader is called (during task execution), + // at which point SparkEnv should be fully available + @volatile private var _shuffleExecutorComponents: ShuffleExecutorComponents = _ + + private def shuffleExecutorComponents: ShuffleExecutorComponents = { + if (_shuffleExecutorComponents == null) { + synchronized { + if (_shuffleExecutorComponents == null) { + val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor() + val extraConfigs = + conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap + // SparkEnv.get should be available when getWriter/getReader is called + // (during task execution), but check for null to avoid hangs + val env = SparkEnv.get + if (env == null) { + throw new IllegalStateException( + "SparkEnv.get is null during shuffleExecutorComponents initialization. " + + "This may indicate a timing issue with SparkEnv initialization.") + } + executorComponents.initializeExecutor( + conf.getAppId, + env.executorId, + extraConfigs.asJava) + _shuffleExecutorComponents = executorComponents + } + } + } + _shuffleExecutorComponents + } override val shuffleBlockResolver: IndexShuffleBlockResolver = { // The patch versions of Spark 3.4 have different constructor signatures: @@ -253,19 +283,6 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { object CometShuffleManager extends Logging { - /** - * Loads executor components for shuffle data IO. - */ - private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = { - val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor() - val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap - executorComponents.initializeExecutor( - conf.getAppId, - SparkEnv.get.executorId, - extraConfigs.asJava) - executorComponents - } - def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. if (dep.mapSideCombine) {