Skip to content

Latest commit

 

History

History

kafka

Kafka

This initialization action installs Kafka on a Google Cloud Dataproc cluster. Kafka depends on Zookeeper, which can either be installed with Zookeeper component or by simply creating a "High Availability" cluster using --num-masters 3 parameter.

By default, Kafka brokers run only on all worker nodes in the cluster, and Kafka libraries and command-line tools are installed on the master node(s) at /usr/lib/kafka. But you can specify to run Kafka broker(s) on master node(s) with --metadata "run-on-master=true".

Using this initialization action

⚠️ NOTICE: See best practices of using initialization actions in production.

You can use this initialization action to create a new Dataproc cluster with Kafka installed:

  1. Use the gcloud command to create a new cluster with this initialization action.

    REGION=<region>
    CLUSTER_NAME=<cluster_name>
    gcloud dataproc clusters create ${CLUSTER_NAME} \
        --region ${REGION} \
        --num-masters 3 \
        --metadata "run-on-master=true" \
        --metadata "install-kafka-python=true" \
        --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/kafka/kafka.sh
  2. You can test your Kafka setup by creating a simple topic and publishing to it with Kafka's command-line tools, after SSH'ing into one of your nodes:

    gcloud compute ssh <CLUSTER_NAME>-m-0
    
    # Create a test topic, just talking to the local master's zookeeper server.
    /usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create \
        --replication-factor 1 --partitions 1 --topic test
    /usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --list
    
    # Use worker 0 as broker to publish 100 messages over 100 seconds
    # asynchronously.
    CLUSTER_NAME=$(/usr/share/google/get_metadata_value attributes/dataproc-cluster-name)
    for i in {0..100}; do echo "message${i}"; sleep 1; done |
        /usr/lib/kafka/bin/kafka-console-producer.sh \
            --broker-list ${CLUSTER_NAME}-w-0:9092 --topic test &
    
    # User worker 1 as broker to consume those 100 messages as they come.
    # This can also be run in any other master or worker node of the cluster.
    /usr/lib/kafka/bin/kafka-console-consumer.sh \
        --bootstrap-server ${CLUSTER_NAME}-w-1:9092 \
        --topic test --from-beginning

You can find more information about using initialization actions with Dataproc in the Dataproc documentation.

Installing CMAK (Kafka Manager) with Kafka

If you would like to use CMAK (Kafka Manager) to manage your Kafka cluster through web UI, use the gcloud command to create a new Kafka cluster with the Kafka Manager initialization action. The following command will create a new high availability Kafka cluster with Kafka Manager running on the first master node. The default HTTP port for Kafka Manager is 9000. Follow the instructions at Dataproc cluster web interfaces to access the web UI.

REGION=<region>
CLUSTER_NAME=<cluster_name>
gcloud dataproc clusters create ${CLUSTER_NAME} \
    --region ${REGION} \
    --num-masters 3 \
    --metadata "run-on-master=true" \
    --metadata "kafka-enable-jmx=true" \
    --metadata "install-kafka-python=true" \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/kafka/kafka.sh,gs://goog-dataproc-initialization-actions-${REGION}/kafka/kafka-manager.sh

Installing Cruise Control with Kafka

If you would like to use Cruise Control to automate common Kafka operations, e.g., automatically fixing under-replicated partitions caused by broker failures, use the gcloud command to create a new Kafka cluster with the Cruise Control initialization action. The following command will create a new high availability Kafka cluster with Cruise Control running on the first master node. The default HTTP port for Cruise Control is 9090. Follow the instructions at Dataproc cluster web interfaces to access the web UI.

REGION=<region>
CLUSTER_NAME=<cluster_name>
gcloud dataproc clusters create ${CLUSTER_NAME} \
    --region ${REGION} \
    --num-masters 3 \
    --metadata "run-on-master=true" \
    --metadata "install-kafka-python=true" \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/kafka/kafka.sh,gs://goog-dataproc-initialization-actions-${REGION}/kafka/cruise-control.sh

Installing Prometheus with Kafka

If you would like to use Prometheus to monitor your Kafka cluster, use the gcloud command to create a new Kafka cluster with the Prometheus initialization action. The following command will create a new high availability Kafka cluster with Prometheus server listening on port 9096 of each node. Follow the instructions at Dataproc cluster web interfaces to access the web UI.

REGION=<region>
CLUSTER_NAME=<cluster_name>
gcloud dataproc clusters create ${CLUSTER_NAME} \
    --region ${REGION} \
    --num-masters 3 \
    --metadata "run-on-master=true" \
    --metadata "prometheus-http-port=9096" \
    --metadata "install-kafka-python=true" \
    --initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/kafka/kafka.sh,gs://goog-dataproc-initialization-actions-${REGION}/prometheus/prometheus.sh

Important notes

  • This script will install Kafka on all worker nodes by default (but not the master(s))
  • This script assumes you have zookeeper installed, either through the zookeeper.sh init action or by creating a high-availability cluster with --num-masters 3
  • The delete.topic.enable property has been set to true by default so topics can be deleted