Load Streaming Data to BigQuery via pubsub

  • Read
  • Discuss

    Pubsub Overview:

   Solution Diagram:

To use a Pub/Sub, you create a topic to hold data and a subscription to access data published to the topic.

  1. Go to PubSub from search bar or Navigation menu
  2. This will take you to the PubSub page
  1. Click +Create Topic
  1. Give the topic a name, we will call this DemoTopic. Check add a default subscription to automatically add subscription for the topic.
  1. Click on Create Topic
  1. Now inside our demoTopic, we can see demoTopic-sub default subscription
"""This script grabs data from a PubSub topic, and stores them in BiqQuery

using the BigQuery Streaming API.


from google.cloud import bigquery

from google.cloud import pubsub_v1 

from googleapiclient.discovery import build 

from googleapiclient.errors import HttpError

#set credentials

credentials = bigquery.Client.from_service_account_json(r'C:\Users\Sherry\Downloads\pelagic-earth-353304-5e9ab57c7481.json')

# Set the name of the dataset and table 

dataset_name = 'pubsub_dataset' 

table_name = 'pubsub_demo' 

# Set the name of the Cloud Pub/Sub topic and subscription 

topic_name = 'demoTopic' 

subscription_name = 'demoTopic-sub' 

# Create a BigQuery client 

bigquery_client = build('bigquery', 'v2', credentials=credentials) 

# Create a Cloud Pub/Sub client 

pubsub_client = pubsub_v1.SubscriberClient() 

def insert_rows(event, context): 

    # Get the message data from the event 

    data = event['data'

    message = data.decode('utf-8'

    # Convert the message data to a list of rows 

    rows = [ { 'json': { 'column_1': 123, 'column_2': 'abc', 'column_3': 4.56 } } ] 

    # Insert the rows into BigQuery 

    bigquery_client.tabledata().insertAll( projectId=credentials.project_id, datasetId=dataset_name, tableId=table_name, body={'rows': rows} ).execute() 

    # Set up a Cloud Pub/Sub subscription

    subscription_path = pubsub_client.subscription_path( credentials.project_id, subscription_name ) 

    def callback(message): insert_rows(message, None


    future = pubsub_client.subscribe(subscription_path, callback) 

    print('Listening for messages on {}'.format(subscription_path)) 

    # Wait for messages to arrive while True: time.sleep(60)

Leave a Reply

Leave a Reply

Scroll to Top