Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ jobs:
- {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
- {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.ExtendedHiveTest"}
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n org.apache.spark.tags.SlowHiveTest"}
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
exclude:
- spark-version: {short: '4.0', full: '4.0.1', java: 17}
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
fail-fast: false
name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{ matrix.spark-version.full }}/java-${{ matrix.spark-version.java }}
runs-on: ${{ matrix.os }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,37 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
*/
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]()

private lazy val shuffleExecutorComponents = loadShuffleExecutorComponents(conf)
// Lazy initialization to avoid accessing SparkEnv.get during ShuffleManager construction,
// which can cause hangs when SparkEnv is not fully initialized (e.g., during Hive metastore ops)
// This is only initialized when getWriter/getReader is called (during task execution),
// at which point SparkEnv should be fully available
@volatile private var _shuffleExecutorComponents: ShuffleExecutorComponents = _

private def shuffleExecutorComponents: ShuffleExecutorComponents = {
if (_shuffleExecutorComponents == null) {
synchronized {
if (_shuffleExecutorComponents == null) {
val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
val extraConfigs =
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
// SparkEnv.get should be available when getWriter/getReader is called
// (during task execution), but check for null to avoid hangs
val env = SparkEnv.get
if (env == null) {
throw new IllegalStateException(
"SparkEnv.get is null during shuffleExecutorComponents initialization. " +
"This may indicate a timing issue with SparkEnv initialization.")
}
executorComponents.initializeExecutor(
conf.getAppId,
env.executorId,
extraConfigs.asJava)
_shuffleExecutorComponents = executorComponents
}
}
}
_shuffleExecutorComponents
}

override val shuffleBlockResolver: IndexShuffleBlockResolver = {
// The patch versions of Spark 3.4 have different constructor signatures:
Expand Down Expand Up @@ -253,19 +283,6 @@ class CometShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {

object CometShuffleManager extends Logging {

/**
* Loads executor components for shuffle data IO.
*/
private def loadShuffleExecutorComponents(conf: SparkConf): ShuffleExecutorComponents = {
val executorComponents = ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
val extraConfigs = conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
executorComponents.initializeExecutor(
conf.getAppId,
SparkEnv.get.executorId,
extraConfigs.asJava)
executorComponents
}

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
Expand Down
Loading