This page shows you how to create and deploy an event receiver service. The target service receives HTTP requests containing the event in the CloudEvents format.
Event providers (sources) can provide the following event types:
Event receiver response
Your receiver service should send an HTTP 2xx
response to signal a successful event receipt to the router. The router treats
all other HTTP responses as delivery failures and will resend the event.
Open source repository
The structure of the HTTP body for all events are available on the CloudEvents GitHub repository.
The repository contains the following to help you understand and use CloudEvents data in your programming language:
- Google Protocol Buffers for CloudEvents data payloads
- Generated JSON schemas
- A public JSON schema catalog
Links to client libraries are also included.
Use a CloudEvents SDK library
You can develop event receiver services using the CloudEvents SDK library, which is available for the following languages:
These libraries are open source and make it easier to transform your HTTP request into a language-idiomatic CloudEvents object.
Sample receiver source code
Cloud Audit Logs
The sample code shows you how to read Cloud Storage events using Cloud Audit Logs in a service deployed to Cloud Run.
Python
@app.route("/", methods=["POST"])
def index():
# Create a CloudEvent object from the incoming request
event = from_http(request.headers, request.data)
# Gets the GCS bucket name from the CloudEvent
# Example: "storage.googleapis.com/projects/_/buckets/my-bucket"
bucket = event.get("subject")
print(f"Detected change in Cloud Storage bucket: {bucket}")
return (f"Detected change in Cloud Storage bucket: {bucket}", 200)
Java
import io.cloudevents.CloudEvent;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EventController {
@RequestMapping(value = "/", method = RequestMethod.POST, consumes = "application/json")
public ResponseEntity<String> receiveMessage(
@RequestBody String body, @RequestHeader HttpHeaders headers) {
CloudEvent event;
try {
event =
CloudEventHttpUtils.fromHttp(headers)
.withData(headers.getContentType().toString(), body.getBytes())
.build();
} catch (CloudEventRWException e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
}
String ceSubject = event.getSubject();
String msg = "Detected change in Cloud Storage bucket: " + ceSubject;
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.OK);
}
}
Node.js
const express = require('express');
const app = express();
app.use(express.json());
app.post('/', (req, res) => {
if (!req.header('ce-subject')) {
return res
.status(400)
.send('Bad Request: missing required header: ce-subject');
}
console.log(
`Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
);
return res
.status(200)
.send(
`Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
);
});
module.exports = app;
Go
// Processes CloudEvents containing Cloud Audit Logs for Cloud Storage
package main
import (
"fmt"
"log"
"net/http"
"os"
cloudevent "github.com/cloudevents/sdk-go/v2"
)
// HelloEventsStorage receives and processes a Cloud Audit Log event with Cloud Storage data.
func HelloEventsStorage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Expected HTTP POST request with CloudEvent payload", http.StatusMethodNotAllowed)
return
}
event, err := cloudevent.NewEventFromHTTPRequest(r)
if err != nil {
log.Printf("cloudevent.NewEventFromHTTPRequest: %v", err)
http.Error(w, "Failed to create CloudEvent from request.", http.StatusBadRequest)
return
}
s := fmt.Sprintf("Detected change in Cloud Storage bucket: %s", event.Subject())
fmt.Fprintln(w, s)
}
C#
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
logger.LogInformation("Service is starting...");
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapPost("/", async context =>
{
logger.LogInformation("Handling HTTP POST");
var ceSubject = context.Request.Headers["ce-subject"];
logger.LogInformation($"ce-subject: {ceSubject}");
if (string.IsNullOrEmpty(ceSubject))
{
context.Response.StatusCode = 400;
await context.Response.WriteAsync("Bad Request: expected header Ce-Subject");
return;
}
await context.Response.WriteAsync($"GCS CloudEvent type: {ceSubject}");
});
});
}
}
Pub/Sub
The sample code shows you how to read Pub/Sub events in a service deployed to Cloud Run.
Python
@app.route("/", methods=["POST"])
def index():
data = request.get_json()
if not data:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(data, dict) or "message" not in data:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = data["message"]
name = "World"
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
print(resp)
return (resp, 200)
Java
import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EventController {
@RequestMapping(value = "/", method = RequestMethod.POST)
public ResponseEntity<String> receiveMessage(
@RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
// Get PubSub message from request body.
PubSubBody.PubSubMessage message = body.getMessage();
if (message == null) {
String msg = "No Pub/Sub message received.";
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
}
String data = message.getData();
if (data == null || data.isEmpty()) {
String msg = "Invalid Pub/Sub message format.";
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
}
String name =
!StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
String ceId = headers.getOrDefault("ce-id", "");
String msg = String.format("Hello, %s! ID: %s", name, ceId);
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.OK);
}
}
Node.js
const express = require('express');
const {
toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();
app.use(express.json());
app.post('/', (req, res) => {
if (!req.body) {
const errorMessage = 'no Pub/Sub message received';
res.status(400).send(`Bad Request: ${errorMessage}`);
console.log(`Bad Request: ${errorMessage}`);
return;
}
if (!req.body.message) {
const errorMessage = 'invalid Pub/Sub message format';
res.status(400).send(`Bad Request: ${errorMessage}`);
console.log(`Bad Request: ${errorMessage}`);
return;
}
// Cast to MessagePublishedEvent for IDE autocompletion
const pubSubMessage = toMessagePublishedData(req.body);
const name =
pubSubMessage.message && pubSubMessage.message.data
? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
: 'World';
const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
console.log(result);
res.send(result);
});
module.exports = app;
Go
// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
)
// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
Message struct {
Data []byte `json:"data,omitempty"`
ID string `json:"id"`
} `json:"message"`
Subscription string `json:"subscription"`
}
// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
var e PubSubMessage
if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
return
}
name := string(e.Message.Data)
if name == "" {
name = "World"
}
s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
log.Printf(s)
fmt.Fprintln(w, s)
}
C#
using CloudNative.CloudEvents;
using CloudNative.CloudEvents.AspNetCore;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
logger.LogInformation("Service is starting...");
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapPost("/", async context =>
{
var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));
var cloudEvent = await context.Request.ToCloudEventAsync(formatter);
logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));
var messagePublishedData = (MessagePublishedData) cloudEvent.Data;
var pubSubMessage = messagePublishedData.Message;
if (pubSubMessage == null)
{
context.Response.StatusCode = 400;
await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
return;
}
var data = pubSubMessage.Data;
logger.LogInformation($"Data: {data.ToBase64()}");
var name = data.ToStringUtf8();
logger.LogInformation($"Extracted name: {name}");
var id = context.Request.Headers["ce-id"];
await context.Response.WriteAsync($"Hello {name}! ID: {id}");
});
});
}
private string GetEventLog(CloudEvent cloudEvent)
{
return $"ID: {cloudEvent.Id}\n"
+ $"Source: {cloudEvent.Source}\n"
+ $"Type: {cloudEvent.Type}\n"
+ $"Subject: {cloudEvent.Subject}\n"
+ $"DataSchema: {cloudEvent.DataSchema}\n"
+ $"DataContentType: {cloudEvent.DataContentType}\n"
+ $"Time: {cloudEvent.Time?.UtcDateTime:yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
+ $"SpecVersion: {cloudEvent.SpecVersion}\n"
+ $"Data: {cloudEvent.Data}";
}
}