Load Streaming Data to BigQuery via pubsub
- Read
- Discuss
Pubsub Overview:
Solution Diagram:
To use a Pub/Sub, you create a topic to hold data and a subscription to access data published to the topic.
- Go to PubSub from search bar or Navigation menu
- This will take you to the PubSub page
- Click +Create Topic
- Give the topic a name, we will call this DemoTopic. Check add a default subscription to automatically add subscription for the topic.
- Click on Create Topic
- Now inside our demoTopic, we can see demoTopic-sub default subscription
"""This script grabs data from a PubSub topic, and stores them in BiqQuery
using the BigQuery Streaming API.
"""
from google.cloud import bigquery
from google.cloud import pubsub_v1
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
#set credentials
credentials = bigquery.Client.from_service_account_json(r'C:\Users\Sherry\Downloads\pelagic-earth-353304-5e9ab57c7481.json')
# Set the name of the dataset and table
dataset_name = 'pubsub_dataset'
table_name = 'pubsub_demo'
# Set the name of the Cloud Pub/Sub topic and subscription
topic_name = 'demoTopic'
subscription_name = 'demoTopic-sub'
# Create a BigQuery client
bigquery_client = build('bigquery', 'v2', credentials=credentials)
# Create a Cloud Pub/Sub client
pubsub_client = pubsub_v1.SubscriberClient()
def insert_rows(event, context):
# Get the message data from the event
data = event['data']
message = data.decode('utf-8')
# Convert the message data to a list of rows
rows = [ { 'json': { 'column_1': 123, 'column_2': 'abc', 'column_3': 4.56 } } ]
# Insert the rows into BigQuery
bigquery_client.tabledata().insertAll( projectId=credentials.project_id, datasetId=dataset_name, tableId=table_name, body={'rows': rows} ).execute()
# Set up a Cloud Pub/Sub subscription
subscription_path = pubsub_client.subscription_path( credentials.project_id, subscription_name )
def callback(message): insert_rows(message, None)
message.ack()
future = pubsub_client.subscribe(subscription_path, callback)
print('Listening for messages on {}'.format(subscription_path))
# Wait for messages to arrive while True: time.sleep(60)
Leave a Reply
You must be logged in to post a comment.