if (a > b) a else b) res4: Long = 15 第一步map一行包含的单" />

1.2 更多的RDD操作

RDD的transformation和action可以组成起来完成复杂的计算。 比如查找包含最多单词的一行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一步map一行包含的单词数到一个整数, 第二步调用reduce得到最大的单词数。map和reduce的参数都是lambda表达式(closures), 可以调用 Scala/Java库. 例如我们很容易的调用在其它地方声明的方法。 这里我们使用Math.max()函数简化代码:

scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一个通用的数据流模式就是MapReduce,在Hadoop中相当流行. Spark实现MapReduce流很容易:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8

此处我们使用flatMap, map 和 reduceByKey转换来计算文件中每个单词的频度。 为了收集单词频度结果,我们可以调用collect action:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
文章导航