Spark2.x中如何用源码剖析SortShuffleWriter具体实现
SortShuffleWriter的基本介绍
SortShuffleWriter是Spark中用于实现SortShuffleManager的一部分,负责将Shuffle数据按照分区键进行分组,并将数据写入磁盘或网络中,供后续Shuffle Read操作使用。SortShuffleWriter的主要功能是对Shuffle数据进行排序,以便能够按照分区读取并合并。
SortShuffleWriter的工作流程
SortShuffleWriter的工作流程可以分为以下几个步骤:
- 将待写入的Shuffle数据按照分区键进行分组。
- 对每个分区的数据进行本地排序。
- 将排序后的数据写入磁盘或网络中。
- 返回写入结果,供后续的Shuffle Read操作使用。
SortShuffleWriter的实现原理
SortShuffleWriter的具体实现涉及以下几个关键步骤:
- 获取每个分区的数据。
- 排序每个分区的数据。
- 将排序后的数据写入磁盘或网络中。
- 返回写入结果,供后续的Shuffle Read操作使用。
val output = new DataOutputStream(bufferedOutputStream(writer))
val sorter = new sort.ShuffleSorter(inputSerializer, output, fileBufferSize, classLoader)
try {
while (i < ns) {
partitionedData.iterator(i).foreach { case (k, v) => writer.write(k, v) }
} finally {
writer.close()
}
val dep = rdd.context.getShuffleDependency[Int, VD, C](rdd)
val partitioner = dep.partitioner.asInstanceOf[Partitioner]
val buckets = new Aggregator[->(Int, C), (Any, Any), Any] {
def zero: (Any, Any) = (null, null)
// partitionIndex: 数据所归属的分区索引
// key: 数据的分区键
// value: 数据的实际值
def seqOp = (c: (Any, Any), kv: (Int, Any)) => {
val (key, value) = kv
partitioner match {
case hashPartitioner: HashPartitioner => hashPartitioner.getPartition(key)
case _ => partitioner.getPartition(key)
}
val writer = blockManager.getDiskWriter(blockId, file, serializer, bufferSize, writeMetrics)
taskComp =>
val taskAttemptContext: TaskAttemptContextImpl = saveImpl.asInstanceOf[TaskAttemptContextImpl]
val partitioner: Partitioner = taskAttemptContext.getPartitioner()
val keyComparator: Comparator[_ >: K] = taskAttemptContext.getKeyOrdering().
taskComp.reportTaskCompletion(')(')(');
val run = new SortShuffleRunHandle[ ]}
def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] {
taskComp =>
taskComp.reportTaskCompletion(')(')(');
try {
val serializeStream = serializer.newInstance(')(')(').serializeStream(')(')(');
val env = SparkEnv.get(')(')(');
val user = env.driverAccumulatorManager(')(')(');
val job = SparkEnv.get(')(')(');
val metricIter = new InputOutputMetrics(None, jobIDs, systemMetrics(')(')(');
val TaskMetricsUpdater = new TaskMetricsUpdater()
def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] {
dependecy = dep.asInstanceOf[base.ShuffleDependency[K, V, C]].doPostMergedCombine();
val fetchers = ShuffleManager.createFetcher();
val mergeSpillFiles= SparkEnv.get()*.getShuffleSpillManager().getSpillFileDetails();
val output = SparkEnv.get()*.shuffleOutput(()")(')('+dep.partitionId(�if(outputUUID)-1*dep.numPartitions(�iterator(".map(").map(").toList;)
猜您想看
-
怎样解决苹果手机上的快捷方式问题?
如何解决苹果手...
2023年04月27日 -
电脑耳机插上没反应怎么办
问题描述当电脑...
2023年06月26日 -
怎样进行基于linuxthreads2.0.1线程源码分析attr.c
引言对于一个开...
2023年07月22日 -
Excel-converter中如何用代码实现java中excel数据导入导出
一、Excel...
2023年07月23日 -
如何使用 OpenWrt 路由器设置静态路由?
如何使用Ope...
2023年04月17日 -
手机无法正常充电,怎么办?
手机无法正常充...
2023年04月28日