2 独立应用

下面我们想说一下怎样使用Spark API编写一个独立的应用程序。 这里使用Scala (SBT构建工具)和Java举例。 (Python官方文档中有,译者未翻译)

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

这个程序统计Spark README文件中包含字符ab的行数。 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们初始化一个SparkContext 作为程序的一部分.

我们将一个SparkConf对象传给SparkContext的构造函数, 它包含了我们程序的信息。

我们的程序依赖Spark API,所以我们包含一个sbt配置文件:simple.sbt 指明Spark是一个依赖, 这个文件也增加了Spark依赖的仓库(repository):

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.1"

为了保证sbt工作正常,我们需要将SimpleApp.scala和simple.sbt放入典型的sbt项目布局的文件夹中。 如此一来我们将应用代码可以打包成一个jar文件, 然后使用spark-submit脚本来运行此程序。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit 
  --class "SimpleApp" 
  --master local[4] 
  target/scala-2.10/simple-project_2.10-1.0.jar
...
Lines with a: 46, Lines with b: 23

或者使用Java

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD logData = sc.textFile(logFile).cache();
    long numAs = logData.filter(new Function() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();
    long numBs = logData.filter(new Function() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();
    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

这个程序统计Spark README文件中包含字符ab的行数。. 注意你需要用你实际的Spark路径替换 YOUR_SPARK_HOME。 不像上面的Spark shell的例子, 我们需要一个JavaSparkContext对象. 我们也创建了RDD (JavaRDD)然后运行transformations. 最后我们传递给Spark一个function对象, 这个function对象是一个匿名类,继承于 spark.api.java.function.Function. Spark开发指南描述了细节. (译者注: 这是Java 7的语法, 通过Java 8 Lambda表达式,上面的代码和scala一样的简化)

为了编译此程序,我们需要写一个Maven pom.xml文件, 增加Spark作为依赖. 注意Spark artifact带有Scala的版本.

project>
  groupId>edu.berkeleygroupId>
  artifactId>simple-projectartifactId>
  modelVersion>4.0.0modelVersion>
  name>Simple Projectname>
  packaging>jarpackaging>
  version>1.0version>
  dependencies>
    dependency> 
      groupId>org.apache.sparkgroupId>
      artifactId>spark-core_2.10artifactId>
      version>1.1.1version>
    dependency>
  dependencies>
project>

使用Maven项目的布局:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们使用Maven打包并使用./bin/spark-submit执行此程序.

# Package a jar containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit 
  --class "SimpleApp" 
  --master local[4] 
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
文章导航