SortShuffleWriter 的基本介绍

SortShuffleWriter 是 Spark 中用于实现 SortShuffleManager 的一部分,负责将 Shuffle 数据按照分区键进行分组,并将数据写入磁盘或网络中,供后续 Shuffle Read 操作使用。SortShuffleWriter 的主要功能是对 Shuffle 数据进行排序,以便能够按照分区读取并合并。

SortShuffleWriter 的工作流程

SortShuffleWriter 的工作流程可以分为以下几个步骤:

  1. 将待写入的 Shuffle 数据按照分区键进行分组。
  2. 对每个分区的数据进行本地排序。
  3. 将排序后的数据写入磁盘或网络中。
  4. 返回写入结果,供后续的 Shuffle Read 操作使用。

SortShuffleWriter 的实现原理

SortShuffleWriter 的具体实现涉及以下几个关键步骤:

  1. 获取每个分区的数据。
  2. val dep = rdd.context.getShuffleDependency[Int, VD, C](rdd)
    val partitioner = dep.partitioner.asInstanceOf[Partitioner]
    val buckets = new Aggregator[->(Int, C), (Any, Any), Any] {
      def zero: (Any, Any) = (null, null)
      // partitionIndex: 数据所归属的分区索引
      // key: 数据的分区键
      // value: 数据的实际值
      def seqOp = (c: (Any, Any), kv: (Int, Any)) => {
        val (key, value) = kv
        partitioner match {
          case hashPartitioner: HashPartitioner => hashPartitioner.getPartition(key)
          case _ => partitioner.getPartition(key)
        }
    val writer = blockManager.getDiskWriter(blockId, file, serializer, bufferSize, writeMetrics)
      val output = new DataOutputStream(bufferedOutputStream(writer))
      val sorter = new sort.ShuffleSorter(inputSerializer, output, fileBufferSize, classLoader)
    try {
      while (i < ns) {
        partitionedData.iterator(i).foreach { case (k, v) => writer.write(k, v) } 
    } finally {
      writer.close()
    }
    Scala
  3. 排序每个分区的数据。
  4. def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] { taskComp =>
      val taskAttemptContext: TaskAttemptContextImpl = saveImpl.asInstanceOf[TaskAttemptContextImpl]
      val partitioner: Partitioner = taskAttemptContext.getPartitioner()
      val keyComparator: Comparator[_ >: K] = taskAttemptContext.getKeyOrdering().
      taskComp.reportTaskCompletion(')(')(');
      val run = new SortShuffleRunHandle[ ]}
    Scala
  5. 将排序后的数据写入磁盘或网络中。
  6. def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] { taskComp =>
      taskComp.reportTaskCompletion(')(')(');
      try {
        val serializeStream = serializer.newInstance(')(')(').serializeStream(')(')(');
        val env = SparkEnv.get(')(')(');
        val user = env.driverAccumulatorManager(')(')(');
        val job = SparkEnv.get(')(')(');
        val metricIter = new InputOutputMetrics(None, jobIDs, systemMetrics(')(')(');
        val TaskMetricsUpdater = new TaskMetricsUpdater()
    Scala
  7. 返回写入结果,供后续的 Shuffle Read 操作使用。
  8. val dependecy = dep.asInstanceOf[base.ShuffleDependency[K, V, C]].doPostMergedCombine(); val fetchers = ShuffleManager.createFetcher(); val mergeSpillFiles= SparkEnv.get()*.getShuffleSpillManager().getSpillFileDetails(); val output = SparkEnv.get()*.shuffleOutput(()")(')('+dep.partitionId(�if(outputUUID)-1*dep.numPartitions(�iterator(".map(").map(").toList;)
    Scala