Skip to content

Latest commit

 

History

History

beam

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Beam Initialization Actions

While Apache Beam is primarily an SDK, jobs running under its portability framework require a job service to run correctly. This directory contains a setup script to properly configure beam job services.

WARNING: The Beam portability framework is under active development and should not be used in a production context. It is also not supported on clusters running in high availability mode.

Due to the current development status of Beam's portability framework, you are responsible for building and maintaining their own Beam artifacts manually. Instructions are included below.

Using this initialization action

⚠️ NOTICE: See best practices of using initialization actions in production.

Building Beam Artifacts

You will generate two categories of artifacts for this initialization action:

| Job Service | This micro service runs on the master node, accepting new beam jobs on port 8099. It is configured to submit Beam jobs to Apache Flink. | | Worker Container Images | These docker images run language-specific code on worker nodes. |

When building manually, substitute the following terms into build commands. In bash, set environment variables using export <term>=<value>.

| BEAM_JOB_SERVICE_DESTINATION | A Cloud Storage directory path accessible by both the build machine and the cluster to be created. | | BEAM_CONTAINER_IMAGE_DESTINATION | A Docker repository path prefix accessible by both the build machine and the cluster to be created. | | BEAM_SOURCE_VERSION | A tag, branch, or commit hash in the Beam source repositories to build artifacts from. (default: master) |

Automated Build

You can invoke a helper script from util directory to build Beam artifacts.

bash ./util/build-beam-artifacts.sh <BEAM_JOB_SERVICE_DESTINATION> <BEAM_CONTAINER_IMAGE_DESTINATION> [<BEAM_SOURCE_VERSION>]

Manual Build

To get started, clone the beam source code into a working directory.

git clone https://github.com/apache/beam.git
cd beam
git checkout ${BEAM_SOURCE_VERSION}

Build the Job Service

Next, build a standalone job service jar.

./gradlew :beam-runners-flink_2.11-job-server:shadowJar

Then, upload the jar to a Cloud Storage path that clusters can access during initialization.

gsutil cp \
  ./runners/flink/job-server/build/libs/beam-runners-flink_2.11-job-server-*-SNAPSHOT.jar \
  <BEAM_JOB_SERVICE_DESTINATION>/beam-runners-flink_2.11-job-server-latest-SNAPSHOT.jar

Build the Worker Container Images

Build the docker images used by Beam worker tasks.

./gradlew docker

Docker images names are generated in the following format: <USER>-docker-apache.bintray.io/beam/<LANGUAGE>.

Rename and push the images to a docker repository path that clusters can access during initialization. As a best practice, tag the images with the BEAM_SOURCE_VERSION they were generated from.

docker tag \
  <USER>-docker-apache.bintray.io/beam/<LANGUAGE> \
  <BEAM_CONTAINER_IMAGE_DESTINATION>/<LANGUAGE>:<BEAM_SOURCE_VERSION>
docker push <BEAM_CONTAINER_IMAGE_DESTINATION>/<LANGUAGE>:<BEAM_SOURCE_VERSION>

Create a Beam Cluster

You create a Beam cluster by calling the Cloud Dataproc clusters create command with the following initialization actions:

  • docker/docker.sh
  • flink/flink.sh
  • beam/beam.sh

The Beam beam and flink/flink.sh initialization actions use the following metadata variables:

Metadata Key Default Description
beam-job-service-snapshot v2.6.0 The Cloud Storage path of your JobService jar (see above)
beam-artifacts-gcs-path <cluster staging bucket> A cluster-writeable GCS path to store beam artifacts under
beam-image-enable-pull false When set to true, the init action will attempt to pull beam worker images for efficient access later
beam-image-version master The image version to use when selecting a tagged image
beam-image-repository apache.bintray.io/beam The image repository root to pull images from. As of September 12th, 2018, these images have not been published yet. Therefore it is recommended that you build and store their own images when using this init action.
flink-start-yarn-session true Run a flink session in YARN on startup.
flink-snapshot-url <none> URL to a Flink snapshot.

You should explicitly set the Beam and Flink metadata variables (use a script as shown later).

REGION=<region>
CLUSTER_NAME="$1"
INIT_ACTIONS="gs://goog-dataproc-initialization-actions-${REGION}/docker/docker.sh"
INIT_ACTIONS+=",gs://goog-dataproc-initialization-actions-${REGION}/flink/flink.sh"
INIT_ACTIONS+=",gs://goog-dataproc-initialization-actions-${REGION}/beam/beam.sh"
FLINK_SNAPSHOT="https://archive.apache.org/dist/flink/flink-1.5.3/flink-1.5.3-bin-hadoop28-scala_2.11.tgz"
METADATA="beam-job-service-snapshot=<...>"
METADATA+=",beam-image-enable-pull=true"
METADATA+=",beam-image-repository=<...>"
METADATA+=",beam-image-version=latest"
METADATA+=",flink-start-yarn-session=true"
METADATA+=",flink-snapshot-url=${FLINK_SNAPSHOT}"

gcloud dataproc clusters create "${CLUSTER_NAME}" \
    --initialization-actions "${INIT_ACTIONS}" \
    --image-version "1.2" \
    --metadata "${METADATA}"

The Beam Job Service runs on port 8099 of the master node. You can submit portable Beam jobs against this port. For example, to run the Go Wordcount example on the master node, upload the wordcount job binary, and then run:

./wordcount \
    --runner flink \
    --endpoint localhost:8099 \
    --experiments beam_fn_api \
    --output=<out> \
    --container_image <BEAM_CONTAINER_DESTINATION>/go:<BEAM_SOURCE_VERSION>

The Beam Job Service port must be opened to submit beam jobs from machines outside the cluster (see Using Firewall Rules for instructions)).