Spark2.x中如何用源码剖析SortShuffleWriter具体实现
SortShuffleWriter的基本介绍
SortShuffleWriter是Spark中用于实现SortShuffleManager的一部分,负责将Shuffle数据按照分区键进行分组,并将数据写入磁盘或网络中,供后续Shuffle Read操作使用。SortShuffleWriter的主要功能是对Shuffle数据进行排序,以便能够按照分区读取并合并。
SortShuffleWriter的工作流程
SortShuffleWriter的工作流程可以分为以下几个步骤:
- 将待写入的Shuffle数据按照分区键进行分组。
- 对每个分区的数据进行本地排序。
- 将排序后的数据写入磁盘或网络中。
- 返回写入结果,供后续的Shuffle Read操作使用。
SortShuffleWriter的实现原理
SortShuffleWriter的具体实现涉及以下几个关键步骤:
- 获取每个分区的数据。
- 排序每个分区的数据。
- 将排序后的数据写入磁盘或网络中。
- 返回写入结果,供后续的Shuffle Read操作使用。
val output = new DataOutputStream(bufferedOutputStream(writer))
val sorter = new sort.ShuffleSorter(inputSerializer, output, fileBufferSize, classLoader)
try {
while (i < ns) {
partitionedData.iterator(i).foreach { case (k, v) => writer.write(k, v) }
} finally {
writer.close()
}
val dep = rdd.context.getShuffleDependency[Int, VD, C](rdd)
val partitioner = dep.partitioner.asInstanceOf[Partitioner]
val buckets = new Aggregator[->(Int, C), (Any, Any), Any] {
def zero: (Any, Any) = (null, null)
// partitionIndex: 数据所归属的分区索引
// key: 数据的分区键
// value: 数据的实际值
def seqOp = (c: (Any, Any), kv: (Int, Any)) => {
val (key, value) = kv
partitioner match {
case hashPartitioner: HashPartitioner => hashPartitioner.getPartition(key)
case _ => partitioner.getPartition(key)
}
val writer = blockManager.getDiskWriter(blockId, file, serializer, bufferSize, writeMetrics)
taskComp =>
val taskAttemptContext: TaskAttemptContextImpl = saveImpl.asInstanceOf[TaskAttemptContextImpl]
val partitioner: Partitioner = taskAttemptContext.getPartitioner()
val keyComparator: Comparator[_ >: K] = taskAttemptContext.getKeyOrdering().
taskComp.reportTaskCompletion(')(')(');
val run = new SortShuffleRunHandle[ ]}
def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] {
taskComp =>
taskComp.reportTaskCompletion(')(')(');
try {
val serializeStream = serializer.newInstance(')(')(').serializeStream(')(')(');
val env = SparkEnv.get(')(')(');
val user = env.driverAccumulatorManager(')(')(');
val job = SparkEnv.get(')(')(');
val metricIter = new InputOutputMetrics(None, jobIDs, systemMetrics(')(')(');
val TaskMetricsUpdater = new TaskMetricsUpdater()
def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] {
dependecy = dep.asInstanceOf[base.ShuffleDependency[K, V, C]].doPostMergedCombine();
val fetchers = ShuffleManager.createFetcher();
val mergeSpillFiles= SparkEnv.get()*.getShuffleSpillManager().getSpillFileDetails();
val output = SparkEnv.get()*.shuffleOutput(()")(')('+dep.partitionId(�if(outputUUID)-1*dep.numPartitions(�iterator(".map(").map(").toList;)
猜您想看
-
Linux环境下的云服务集成
1. 云服务定...
2024年05月30日 -
使用PHP和jQuery实现实时数据更新的技巧
使用PHP和j...
2023年05月14日 -
springcloud-sleuth源码怎么解析2-TraceFilter
TraceFi...
2023年07月23日 -
如何使用iPhone上的导航工具优化路径规划
如何使用iPh...
2023年05月05日 -
VM安装的虚拟机怎么在局域网内互相访问
一、局域网内V...
2023年05月25日 -
如何在Linux中使用Vim编辑器
如何在Linu...
2023年05月05日