SortShuffleWriter的基本介绍

SortShuffleWriter是Spark中用于实现SortShuffleManager的一部分,负责将Shuffle数据按照分区键进行分组,并将数据写入磁盘或网络中,供后续Shuffle Read操作使用。SortShuffleWriter的主要功能是对Shuffle数据进行排序,以便能够按照分区读取并合并。

SortShuffleWriter的工作流程

SortShuffleWriter的工作流程可以分为以下几个步骤:

  1. 将待写入的Shuffle数据按照分区键进行分组。
  2. 对每个分区的数据进行本地排序。
  3. 将排序后的数据写入磁盘或网络中。
  4. 返回写入结果,供后续的Shuffle Read操作使用。

SortShuffleWriter的实现原理

SortShuffleWriter的具体实现涉及以下几个关键步骤:

  1. 获取每个分区的数据。
  2. val dep = rdd.context.getShuffleDependency[Int, VD, C](rdd)
    val partitioner = dep.partitioner.asInstanceOf[Partitioner]
    val buckets = new Aggregator[->(Int, C), (Any, Any), Any] {
      def zero: (Any, Any) = (null, null)
      // partitionIndex: 数据所归属的分区索引
      // key: 数据的分区键
      // value: 数据的实际值
      def seqOp = (c: (Any, Any), kv: (Int, Any)) => {
        val (key, value) = kv
        partitioner match {
          case hashPartitioner: HashPartitioner => hashPartitioner.getPartition(key)
          case _ => partitioner.getPartition(key)
        }
    val writer = blockManager.getDiskWriter(blockId, file, serializer, bufferSize, writeMetrics)
      val output = new DataOutputStream(bufferedOutputStream(writer))
      val sorter = new sort.ShuffleSorter(inputSerializer, output, fileBufferSize, classLoader)
    try {
      while (i < ns) {
        partitionedData.iterator(i).foreach { case (k, v) => writer.write(k, v) } 
    } finally {
      writer.close()
    }
  3. 排序每个分区的数据。
  4. def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] { taskComp =>
      val taskAttemptContext: TaskAttemptContextImpl = saveImpl.asInstanceOf[TaskAttemptContextImpl]
      val partitioner: Partitioner = taskAttemptContext.getPartitioner()
      val keyComparator: Comparator[_ >: K] = taskAttemptContext.getKeyOrdering().
      taskComp.reportTaskCompletion(')(')(');
      val run = new SortShuffleRunHandle[ ]}
  5. 将排序后的数据写入磁盘或网络中。
  6. def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] { taskComp =>
      taskComp.reportTaskCompletion(')(')(');
    try {
    val serializeStream = serializer.newInstance(')(')('
    ).serializeStream(')(')('); val env = SparkEnv.get(')(')('); val user = env.driverAccumulatorManager(')(')('); val job = SparkEnv.get(')(')('); val metricIter = new InputOutputMetrics(None, jobIDs, systemMetrics(')(')('); val TaskMetricsUpdater = new TaskMetricsUpdater()
  7. 返回写入结果,供后续的Shuffle Read操作使用。
  8.  dependecy = dep.asInstanceOf[base.ShuffleDependency[K, V, C]].doPostMergedCombine();
    val fetchers = ShuffleManager.createFetcher();
    val mergeSpillFiles= SparkEnv.get()*.getShuffleSpillManager().getSpillFileDetails();
    val output = SparkEnv.get()*.shuffleOutput(()")('
    )('+dep.partitionId(if(outputUUID)-1*dep.numPartitions(iterator(".map(").map(").toList;)