Apache Spark Runner的使用

Apache Spark Runner可以使用Apache Spark. 来执行管道(Pipeline)。 Spark Runner可以像本机Spark应用程序那样执行Spark管道;为本地模式部署一个自包含的应用程序,在Spark的独立RM上运行,或者使用纱线或Mesos。

Spark为Spark Runner执行Beam pipelines提供支持如下:

Beam Capability Matrix文档记录了Spark Runner当前支持的功能。

_注:_支持流媒体流中的Beam模型(Beam Model)目前处于实验性阶段,请在邮件列表中跟踪状态发展。mailing list

Spark Runner 先决条件和设置

The Spark runner currently supports Spark’s 1.6 branch, and more specifically any version greater than 1.6.0.

You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following:


Deploying Spark with your application

In some cases, such as running in local mode/Standalone, your (self-contained) application would be required to pack Spark by explicitly adding the following dependencies in your pom.xml:



And shading the application jar using the maven shade plugin:


After running mvn package, run ls target and you should see (assuming your artifactId is beam-examples and the version is 1.0.0):


To run against a Standalone cluster simply run:

spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner

Running on a pre-deployed Spark cluster

Deploying your Beam pipeline on a cluster that already has a Spark deployment (Spark classes are available in container classpath) does not require any additional dependencies. For more details on the different deployment modes see: Standalone, YARN, or Mesos.

Pipeline options for the Spark Runner

When executing your pipeline with the Spark Runner, you should consider the following pipeline options.

Field Description Default Value
runner The pipeline runner to use. This option allows you to determine the pipeline runner at runtime. Set to SparkRunner to run using Spark.
sparkMaster The url of the Spark Master. This is the equivalent of setting SparkConf#setMaster(String) and can either be local[x] to run local with x cores, spark://host:port to connect to a Spark Standalone cluster, mesos://host:port to connect to a Mesos cluster, or yarn to connect to a yarn cluster. local[4]
storageLevel The StorageLevel to use when caching RDDs in batch pipelines. The Spark Runner automatically caches RDDs that are evaluated repeatedly. This is a batch-only property as streaming pipelines in Beam are stateful, which requires Spark DStream's StorageLevel to be MEMORY_ONLY. MEMORY_ONLY
batchIntervalMillis The StreamingContext's batchDuration - setting Spark's batch interval. 1000
enableSparkMetricSinks Enable reporting metrics to Spark's metrics Sinks. true

Additional notes

Using spark-submit

When submitting a Spark application to cluster, it is common (and recommended) to use the spark-submit script that is provided with the spark installation. The PipelineOptions described above are not to replace spark-submit, but to complement it. Passing any of the above mentioned options could be done as one of the application-arguments, and setting --master takes precedence. For more on how to generally use spark-submit checkout Spark documentation.

Monitoring your job

You can monitor a running Spark job using the Spark Web Interfaces. By default, this is available at port 4040 on the driver node. If you run Spark on your local machine that would be http://localhost:4040. Spark also has a history server to view after the fact. Metrics are also available via REST API. Spark provides a metrics system that allows reporting Spark metrics to a variety of Sinks. The Spark runner reports user-defined Beam Aggregators using this same metrics system and currently supports GraphiteSink and CSVSink, and providing support for additional Sinks supported by Spark is easy and straight-forward.

Streaming Execution

If your pipeline uses an UnboundedSource the Spark Runner will automatically set streaming mode. Forcing streaming mode is mostly used for testing and is not recommended.

Using a provided SparkContext and StreamingListeners

If you would like to execute your Spark job with a provided SparkContext, such as when using the spark-jobserver, or use StreamingListeners, you can’t use SparkPipelineOptions (the context or a listener cannot be passed as a command-line argument anyway). Instead, you should use SparkContextOptions which can only be used programmatically and is not a common PipelineOptions implementation.