This document describes how to integrate Apache Kafka and Pub/Sub by using the Pub/Sub Group Kafka Connector.
About the Pub/Sub Group Kafka Connector
Apache Kafka is an open source platform for streaming events. It is commonly used in distributed architectures to enable communication between loosely coupled components. Pub/Sub is a managed service for sending and receiving messages asynchronously. As with Kafka, you can use Pub/Sub to communicate between components in your cloud architecture.
The Pub/Sub Group Kafka Connector allows you to integrate these two systems. The following connectors are packaged in the Connector JAR:
- The sink connector reads records from one or more Kafka topics and publishes them to Pub/Sub.
- The source connector reads messages from a Pub/Sub topic and publishes them to Kafka.
Here are some scenarios in which you might use the Pub/Sub Group Kafka Connector:
- You are migrating a Kafka-based architecture to Google Cloud.
- You have a frontend system that stores events in Kafka outside of Google Cloud, but you also use Google Cloud to run some of your backend services, which need to receive the Kafka events.
- You collect logs from an on-premises Kafka solution and send them to Google Cloud for data analytics.
- You have a frontend system that uses Google Cloud, but you also store data on-premises using Kafka.
The connector requires Kafka Connect, which is a framework for streaming data between Kafka and other systems. To use the connector, you must run Kafka Connect alongside your Kafka cluster.
This document assumes that you are familiar with both Kafka and Pub/Sub. Before you read this document, it's a good idea to complete one of the Pub/Sub quickstarts.
The Pub/Sub connector does not support any integration between Google Cloud IAM and Kafka Connect ACLs.
Get started with the connector
This section walks you through the following tasks:- Configure the Pub/Sub Group Kafka Connector.
- Send events from Kafka to Pub/Sub.
- Send messages from Pub/Sub to Kafka.
Prerequisites
Install Kafka
Follow the Apache Kafka quickstart to install a single-node Kafka on your local machine. Complete these steps in the quickstart:
- Download the latest Kafka release and extract it.
- Start the Kafka environment.
- Create a Kafka topic.
Authenticate
The Pub/Sub Group Kafka Connector must authenticate with Pub/Sub in order to send and receive Pub/Sub messages. To set up authentication, perform the following steps:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Download the connector JAR
Download the connector JAR file to your local machine. For more information, see Acquire the connector in the GitHub readme.
Copy the connector configuration files
Clone or download the GitHub repository for the connector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copy the contents of the
config
directory into theconfig
subdirectory of your Kafka installation.cp config/* [path to Kafka installation]/config/
These files contain configuration settings for the connector.
Update your Kafka Connect configuration
- Navigate to the directory that contains the Kafka Connect binary that you downloaded.
- In the Kafka Connect binary directory, open the file named
config/connect-standalone.properties
in a text editor. - If the
plugin.path property
is commented out, uncomment it. Update the
plugin.path property
to include the path to the connector JAR.Example:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Set the
offset.storage.file.filename
property to a local file name. In standalone mode, Kafka uses this file to store offset data.Example:
offset.storage.file.filename=/tmp/connect.offsets
Forward events from Kafka to Pub/Sub
This section describes how to start the sink connector, publish events to Kafka, and then read the forwarded messages from Pub/Sub.
Use the Google Cloud CLI to create a Pub/Sub topic with a subscription.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Replace the following:
- PUBSUB_TOPIC: The name of a Pub/Sub topic to receive the messages from Kafka.
- PUBSUB_SUBSCRIPTION: The name of a Pub/Sub subscription for the topic.
Open the file
/config/cps-sink-connector.properties
in a text editor. Add values for the following properties, which are marked"TODO"
in the comments:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Replace the following:
- KAFKA_TOPICS: A comma-separated list of Kafka topics to read from.
- PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
- PUBSUB_TOPIC: The Pub/Sub topic to receive the messages from Kafka.
From the Kafka directory, run the following command:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Follow the steps in the Apache Kafka quickstart to write some events to your Kafka topic.
Use the gcloud CLI to read the events from Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Forward messages from Pub/Sub to Kafka
This section describes how to start the source connector, publish messages to Pub/Sub, and read the forwarded messages from Kafka.
Use the gcloud CLI to create a Pub/Sub topic with a subscription.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Replace the following:
- PUBSUB_TOPIC: The name of a Pub/Sub topic.
- PUBSUB_SUBSCRIPTION: The name of a Pub/Sub subscription.
Open the file named
/config/cps-source-connector.properties
in a text editor. Add values for the following properties, which are marked"TODO"
in the comments:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Replace the following:
- KAFKA_TOPIC: The Kafka topics to receive the Pub/Sub messages.
- PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
- PUBSUB_TOPIC: The Pub/Sub topic.
From the Kafka directory, run the following command:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use the gcloud CLI to publish a message to Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Read the message from Kafka. Follow the steps in the Apache Kafka quickstart to read the messages from the Kafka topic.
Message conversion
A Kafka record contains a key and a value, which are variable-length byte arrays. Optionally, a Kafka record can also have headers, which are key-value pairs. A Pub/Sub message has two main parts: the message body and zero or more key-value attributes.
Kafka Connect uses converters to serialize keys and values to and from Kafka. To control the serialization, set the following properties in the connector configuration files:
key.converter
: The converter used to serialize record keys.value.converter
: The converter used to serialize record values.
The body of a Pub/Sub message is a ByteString
object, so the
most efficient conversion is to copy the payload directly. For that reason, we
recommend using a converter that produces primitive data types (integer, float,
string, or bytes schema) where possible, to prevent deserializing and
reserializing the same message body.
Conversion from Kafka to Pub/Sub
The sink connector converts Kafka records to Pub/Sub messages as follows:
- The Kafka record key is stored as an attribute named
"key"
in the Pub/Sub message. - By default, the connector drops any headers in the Kafka record. However, if
you set the
headers.publish
configuration option totrue
, the connector writes the headers as Pub/Sub attributes. The connector skips any headers that exceed the Pub/Sub limits on message attributes. - For integer, float, string, and bytes schemas, the connector passes the bytes of the Kafka record value directly into the Pub/Sub message body.
- For struct schemas, the connector writes each field as an attribute of the
Pub/Sub message. For example, if the field is
{ "id"=123 }
, the resulting Pub/Sub message has an attribute"id"="123"
. The field value is always converted to a string. Map and struct types are not supported as field types within a struct. - For map schemas, the connector writes each key-value pair as an attribute of
the Pub/Sub message. For example, if the map is
{"alice"=1,"bob"=2}
, the resulting Pub/Sub message has two attributes,"alice"="1"
and"bob"="2"
. The keys and values are converted to strings.
Struct and map schemas have some additional behaviors:
Optionally, you can specify a particular struct field or map key to be the message body, by setting the
messageBodyName
configuration property. The value of the field or key is stored as aByteString
in the message body. If you don't setmessageBodyName
, then the message body is empty for struct and map schemas.For array values, the connector supports only primitive array types. The sequence of values in the array is concatenated into a single
ByteString
object.
Conversion from Pub/Sub to Kafka
The source connector converts Pub/Sub messages to Kafka records as follows:
Kafka record key: By default, the key is set to
null
. Optionally, you can specify a Pub/Sub message attribute to use as the key, by setting thekafka.key.attribute
configuration option. In that case, the connector looks for an attribute with that name and sets the record key to the attribute value. If the specified attribute is not present, the record key is set tonull
.Kafka record value. The connector writes the record value as follows:
If the Pub/Sub message has no custom attributes, the connector writes the Pub/Sub message body directly to the Kafka record value as a
byte[]
type, using the converter specified byvalue.converter
.If the Pub/Sub message has custom attributes and
kafka.record.headers
isfalse
, the connector writes a struct to the record value. The struct contains one field for each attribute, and a field named"message"
whose value is the Pub/Sub message body (stored as bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
In this case, you must use a
value.converter
that is compatible withstruct
schemas, such asorg.apache.kafka.connect.json.JsonConverter
.If the Pub/Sub message has custom attributes and
kafka.record.headers
istrue
, the connector writes the attributes as Kafka record headers. It writes the Pub/Sub message body directly to the Kafka record value as abyte[]
type, using the converter specified byvalue.converter
.
Kafka record headers. By default, the headers are empty, unless you set
kafka.record.headers
totrue
.
Configuration options
In addition to the configurations provided by the Kafka Connect API, the Pub/Sub Group Kafka Connector supports the sink and source configuration as described in Pub/Sub connector configs.
Getting support
If you need help, create a support ticket. For general questions and discussions, create an issue in the GitHub repository.
What's next
- Understand the differences between Kafka and Pub/Sub.
- Learn more about the Pub/Sub Group Kafka Connector.
- See the Pub/Sub Group Kafka Connector GitHub repository.