微信搜索lxw1234bigdata | 邀请体验:数阅–数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey

Spark lxw1234@qq.com 44170℃ 15评论

关键字:Spark算子、Spark RDD键值转换、combineByKey、foldByKey

combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

 

该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C

mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C

mergeCombiners:合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C

numPartitions:结果RDD分区数,默认保持原有的分区数

partitioner:分区函数,默认为HashPartitioner

mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

看下面例子:

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21

scala> rdd1.combineByKey(
     |       (v : Int) => v + "_",   
     |       (c : String, v : Int) => c + "@" + v,  
     |       (c1 : String, c2 : String) => c1 + "$" + c2
     |     ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

其中三个映射函数分别为:
createCombiner: (V) => C
(v : Int) => v + “_” //在每一个V值后面加上字符_,返回C类型(String)
mergeValue: (C, V) => C
(c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)
mergeCombiners: (C, C) => C
(c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String)
其他参数为默认值。

最终,将RDD[String,Int]转换为RDD[String,String]。

再看例子:

rdd1.combineByKey(
      (v : Int) => List(v),
      (c : List[Int], v : Int) => v :: c,
      (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

最终将RDD[String,Int]转换为RDD[String,List[Int]]。

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

 

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.

直接看例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
scala> rdd1.foldByKey(0)(_+_).collect
res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) 
//将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操
//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即:
//("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

scala> rdd1.foldByKey(2)(_+_).collect
res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函
//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)

再看乘法操作:

scala> rdd1.foldByKey(0)(_*_).collect
res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
//先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",0*0), ("A",2*0),
//即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,0*0),即:(A,0)
//其他K也一样,最终都得到了V=0

scala> rdd1.foldByKey(1)(_*_).collect
res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
//映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。


在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。

 

更多关于Spark算子的介绍,可参考 Spark算子

http://lxw1234.com/archives/tag/spark%E7%AE%97%E5%AD%90

 

 

如果觉得本博客对您有帮助,请 赞助作者

转载请注明:lxw的大数据田地 » Spark算子:RDD键值转换操作(2)–combineByKey、foldByKey

喜欢 (17)
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(15)个小伙伴在吐槽
  1. 相同的key,zeroValue只会应用到第一个V。用减法试试就知道了
    • 同意,不过我觉的zeroValue更像是带计算结果的初始值
      Coco2016-05-31 22:30 回复
  2. 第一个例子,执行的结果,好像不正确。 (B,1_@2) (A,1_@2) (C,1_)
    匿名2016-04-06 11:25 回复
    • 我在idea中得到的是这个结果,但是在scala终端下得到的是与文中一致的结果,不太明白,你知道怎么回事吗?
      sychen2016-06-15 16:20 回复
    • 这个结果是对的。
      昵称2019-02-15 15:02 回复
  3. 执行结果是正确的: (A,1_$2_) (B,1_$2_) (C,1_) 但就就是不明白 mergeValue: (C, V) => C (c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String) 怎么发挥作用?
    coolsctv2016-04-12 15:44 回复
    • 同,求解答
      大股东股份】2016-09-14 10:58 回复
  4. 你好,我在scala终端下执行可以得到与文中一样的结果,但是当我把同样的代码复制到idea中后却得到了不同的结果,请问是为什么? var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) rdd1.combineByKey( (v : Int) => v + "_", (c : String, v : Int) => c + "@" + v, (c1 : String, c2 : String) => c1 + "$" + c2 ).collect.foreach(println) 得到结果: (B,1_@2) (A,1_@2) (C,1_)
    sychen2016-06-15 16:17 回复
    • 这个跟不同环境默认分区有关系,1个分区这种,肯定没有分区间的combine,也就不会有value中的@关联,分区多的情况系,比如(A,1),(A,2)可能分到不同分区,就会存在分区间使用mergeCombiners函数聚合,产生(A,1_@2)
      小白2019-06-11 15:13 回复
  5. 作者写的不好,确实不好! 看别人解释的: combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。 要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的键相同。combineByKey()的处理流程如下: 如果是一个新的元素,此时使用createCombiner()来创建那个键对应的累加器的初始值。(!注意:这个过程会在每个分区第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。) 如果这是一个在处理当前分区中之前已经遇到键,此时combineByKey()使用mergeValue()将该键的累加器对应的当前值与这个新值进行合并。 3.由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()将各个分区的结果进行合并。
    夏季雨2016-11-28 12:16 回复
    • 对的,第三个函数是对分区的合并
      阿萨德2016-12-28 09:45 回复
  6. 博士你好:在测试foldByKey算子的时候发现计算结果不一致。。val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) rdd1.foldByKey(2)(_+_).collect.foreach(println) 结果为:(B,5)(A,4)(C,3)
    yangxion2017-10-13 00:29 回复
    • foldByKey函数的初始值会应用在rdd的每个分区里面,然后对每个分区的相同key的value从初始值开始累加,所以最终的结果是跟分区有关的,你可以repartition一下初始rdd设置不一样的分区数,看看结果就能明白
      你猜2017-11-24 11:13 回复
      • 正解,相同分区初始值只应用一次,然后value值累加
        小白2019-06-11 15:32 回复
  7. foldByKey 没有完全讲明白。
    昵称2019-02-15 15:16 回复