Receiving and Storing Sensor Data with Google Cloud Dataflow and Cloud Storage

This blog continues the blogs:

In the previous blog we connected our ESP to Google Cloud IoT Core, now it’s time to actually do something with the data provided by our sensor.

In this first step, we will be using Google Cloud Dataflow to fetch the sensordata from Google Cloud PubSub, transform it and store it as individual objects in Google Cloud Storage.

The following overview shows this more visual:

Create Bucket on Google Cloud Storage

Before being able to store anything in Google Cloud Storage, we need to create a bucket.

On your Google Cloud Console, navigate to Google Cloud Storage. If you have not yet created a bucket in your current project, a wizard will automatically open.

On the next screen you will be asked to give a few configuration details to your bucket. It is up to you how you select those, I would recommend to go cheap and select the same location as for IoT Core.

Build Google Cloud Dataflow Pipeline

Finally, we need to create a Pipeline, which will read the sensordata from our MQTT Topic in Google Cloud PubSub and will store the message content as an Object in Google Cloud Storage.

Before we can start building our Pipeline, we need to enable the Dataflow APIs in our project. To do so, select “API” from the Google Cloud Console menu and select “Enable APIs and Services”

In the Search-Field enter “dataflow” and click on the shown result.

Now you can enable the API

To get you started with building the Pipeline, I have provided a small sample, which will create exactly that for you. You can find it at my Github-Repo. The file we want to look at is Google_Cloud_IoT_Demo/dataflow/first-dataflow/src/main/java/com/example/PubSubReader.java. The relevant segment is

public static void main(String[] args) {
      PubSubOptions options = PipelineOptionsFactory.fromArgs(args)
                                                           .withValidation()
                                                           .as(PubSubOptions.class);
      Pipeline p = Pipeline.create(options);

      p.apply(PubsubIO.readStrings().fromTopic(options.getPubSubTopic()))
        .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
        .apply(TextIO.write().to(options.getOutput()).withWindowedWrites().withNumShards(1));

    
      p.run();
  }

More specific, we should currently focus at

p.apply(PubsubIO.readStrings().fromTopic(options.getPubSubTopic()))
  .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO.write().to(options.getOutput()).withWindowedWrites().withNumShards(1));

The first line of core opens our PubSub MQTT Topics as an input source of our PIpeline. In the second window we apply Windowing, which segments the incoming data-stream. This is needed as we are reading from a stream oriented input-source and are writing to a batch oriented output-source. The last line writes the received, and currently untouched, data Cloud Storage as an Object, while combining 1-minute of collected data into one Object.

To run this compile and run this code, you will have to run the following command

$ mvn compile exec:java -Dexec.mainClass=com.example.PubSubReader \
                        -Dexec.args="--project=<Project Name> \
                        --stagingLocation=<Bucket Name>/staging/ \
                        --tempLocation=<Bucket Name>/temp \
                        --output=<Bucket Name>/output/ \
                        --pubSubTopic=<Name of created topic> \
                        --runner=DataflowRunner \
                        --region=europe-west1" \
                        -Pdataflow-runner

You will obviously have to replace the placeholder to confirm with your namings, in my case this is

$ mvn compile exec:java -Dexec.mainClass=com.example.PubSubReader \
                        -Dexec.args="--project=psteiner-iot-demo-test-10 \
                        --stagingLocation=gs://psteiner_iot_bucket/staging/ \
                        --tempLocation=gs://psteiner_iot_bucket/temp \
                        --output=gs://psteiner_iot_bucket/output/ \
                        --pubSubTopic=projects/psteiner-iot-demo-test-10/topics/sensorData  \
                        --runner=DataflowRunner \
                        --region=europe-west1" \
                        -Pdataflow-runner

Wait for the build to complete with

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Verify Data arrived

There are at least two ways to check if your data is received and processed. At this stage we will only go via Google Cloud Storage and verify if Objects are stored in our created bucket.

Start by opening the Cloud Storage Console and select the Bucket you created earlier.

Now select the output subfolder

and one of the stored Objects to see it’s content.

This concludes this blog …

Leave a Comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: