Spark2.x中如何用源码剖析SortShuffleWriter具体实现
SortShuffleWriter的基本介绍
SortShuffleWriter是Spark中用于实现SortShuffleManager的一部分,负责将Shuffle数据按照分区键进行分组,并将数据写入磁盘或网络中,供后续Shuffle Read操作使用。SortShuffleWriter的主要功能是对Shuffle数据进行排序,以便能够按照分区读取并合并。
SortShuffleWriter的工作流程
SortShuffleWriter的工作流程可以分为以下几个步骤:
- 将待写入的Shuffle数据按照分区键进行分组。
- 对每个分区的数据进行本地排序。
- 将排序后的数据写入磁盘或网络中。
- 返回写入结果,供后续的Shuffle Read操作使用。
SortShuffleWriter的实现原理
SortShuffleWriter的具体实现涉及以下几个关键步骤:
- 获取每个分区的数据。
- 排序每个分区的数据。
- 将排序后的数据写入磁盘或网络中。
- 返回写入结果,供后续的Shuffle Read操作使用。
val output = new DataOutputStream(bufferedOutputStream(writer))
val sorter = new sort.ShuffleSorter(inputSerializer, output, fileBufferSize, classLoader)
try {
while (i < ns) {
partitionedData.iterator(i).foreach { case (k, v) => writer.write(k, v) }
} finally {
writer.close()
}
val dep = rdd.context.getShuffleDependency[Int, VD, C](rdd)
val partitioner = dep.partitioner.asInstanceOf[Partitioner]
val buckets = new Aggregator[->(Int, C), (Any, Any), Any] {
def zero: (Any, Any) = (null, null)
// partitionIndex: 数据所归属的分区索引
// key: 数据的分区键
// value: 数据的实际值
def seqOp = (c: (Any, Any), kv: (Int, Any)) => {
val (key, value) = kv
partitioner match {
case hashPartitioner: HashPartitioner => hashPartitioner.getPartition(key)
case _ => partitioner.getPartition(key)
}
val writer = blockManager.getDiskWriter(blockId, file, serializer, bufferSize, writeMetrics)
taskComp =>
val taskAttemptContext: TaskAttemptContextImpl = saveImpl.asInstanceOf[TaskAttemptContextImpl]
val partitioner: Partitioner = taskAttemptContext.getPartitioner()
val keyComparator: Comparator[_ >: K] = taskAttemptContext.getKeyOrdering().
taskComp.reportTaskCompletion(')(')(');
val run = new SortShuffleRunHandle[ ]}
def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] {
taskComp =>
taskComp.reportTaskCompletion(')(')(');
try {
val serializeStream = serializer.newInstance(')(')(').serializeStream(')(')(');
val env = SparkEnv.get(')(')(');
val user = env.driverAccumulatorManager(')(')(');
val job = SparkEnv.get(')(')(');
val metricIter = new InputOutputMetrics(None, jobIDs, systemMetrics(')(')(');
val TaskMetricsUpdater = new TaskMetricsUpdater()
def write(records: Iterator[Product2[K, V]]): Unit = withTaskCompletionListener[Unit] {
dependecy = dep.asInstanceOf[base.ShuffleDependency[K, V, C]].doPostMergedCombine();
val fetchers = ShuffleManager.createFetcher();
val mergeSpillFiles= SparkEnv.get()*.getShuffleSpillManager().getSpillFileDetails();
val output = SparkEnv.get()*.shuffleOutput(()")(')('+dep.partitionId(�if(outputUUID)-1*dep.numPartitions(�iterator(".map(").map(").toList;)
猜您想看
-
C语言中怎么求最大公约数
一、什么是最大...
2023年05月26日 -
java8中时间API怎么用
一、介绍Jav...
2023年05月25日 -
如何在网易云音乐上动态更新你的听歌记录?
一、登录网易云...
2023年05月15日 -
如何在PHP中使用Kafka进行消息队列
使用Kafk...
2023年05月05日 -
MYSQL 8 日志系统到底比MYSQL 5.X好在哪里
一、MySQL...
2023年05月22日 -
Redis列表类型的常用命令整理
Redis是一...
2023年07月22日