top of page
Search

Simple integration pipeline from JSON APIs to BigQuery

timandrews1

It's no secret that I prefer simple, cheap, and built-in solutions over costly, complicated and heavy approaches. I've got a few reasons for this: heavier solutions take effort to R&D, gain company approval, gain budgeting approval, and often, lots of training. This investigative work is all time and money that could be spent building more business projects. Simply put, after establishing a sound and intuitive architecture, I prefer to spend more time working on business projects as opposed to getting ready to be able to work on business projects.


BigQuery is one of the ultimate examples of being able to save setup time - it doesn't require as much configuration as Snowflake or Azure Synapse, and the free TB per month of processing power really allows teams to go all out when prototyping solutions. It's a data warehouse with a very low barrier to entry. However, I'm not a 100% code-it-from-scratch-and-save-it-as-a-template person - I prefer dbt over hand-crafted store procedures for data transformations. Still, dbt is much more simplistic to use than traditional ETL tools.


Today's example focuses on a situation for integrating third-party JSON APIs into BigQuery. If it works for you, it's possible to develop a robust integration solution with a little bit of Python, Cloud Functions, and Cloud Scheduler. Granted, this will not necessarily work for creating and managing thousands of discrete ELT routines with complex dependencies, but for newer companies with fewer data sources, or for situations where third party JSON API data is more of a data augmentation, this solution is definitely workable and extendable. The best part is that it doesn't require a separate thrid-party product with its own learning curve, and the code to implement is very concise.


Integrating top news from newsapi.org

The requirements for today's example are as follows:

  • To facilitate future analysis and machine learning, download and store the top US news stories and integrate into Google BigQuery every three hours.

The solution is streamlined and consists of the following:

  • Obtain an API key from newsapi.org

  • Create a Google Cloud Function in Python which downloads the top stories, saves it to a json file and uploads to Google BigQuery

  • Merge the uploaded/staged data into a permanent table with timestamp

  • Schedule the Cloud Function to run every three hours

Cloud function setup

Our cloud function will use a simplistic setup:

  • Environment - 1st gen

  • Function name - download-top-news

  • Region - your choice

  • Trigger type - HTTP

  • Requires authentication - True

  • Require HTTPS - True

  • Timeout - Max (540 seconds)

The source code will use Python 3.10. Be sure that the required package dependencies are added to the requirements.txt file, and that the the entry point is name "download_news."


The code is straightforward - we are accessing the newsapi.org api, and saving the results to a temporary file. The temporary file is then uploaded to a staging table in BigQuery. If a permanent version of the staging table does not exist, it is created. The data is then loaded into the permanent table along with the timestamp. This function could easily be genericized to be callable with arguments for different APIs and targets. Additionally, better exception handling and alerting could be added. However, for brevity, the example is as below:

import wget
import os
from google.cloud import bigquery

def download_news(request):
    """
    Script which queries for the top news stories from newsapi.org and loads the articles into BigQuery.
    This script is meant to be run as a Cloud Functions job.
    """

    #Download the latest headlines from newsapi.org
    url =  "https://newsapi.org/v2/top-headlines?country=us&apiKey=<<your API key>>"
    temp_file = "/tmp/topnews.json"
    wget.download(url, out= temp_file) 

    #Create BigQuery client
    client = bigquery.Client()

    #Create BigQuery table information
    dataset_id = "misc"
    table_id = 'stage_topnews'
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    job_config.autodetect = True
    job_config.write_disposition = "WRITE_TRUNCATE"

    #Load the file into BigQuery
    with open(temp_file, "rb") as source_file:
        job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
        print("Loading file to BigQuery...")
    source_file.close()

    #Wait for the load to complete
    job.result()  

    #Create a permanent BigQuery table to store the JSON results, if it does not exist
    query = """
    CREATE TABLE IF NOT EXISTS misc.json_topnews
    AS 
    SELECT *, CURRENT_TIMESTAMP() AS time_loaded 
    FROM misc.stage_topnews
    LIMIT 0;
    """

    query_job = client.query(query)
    query_job.result()

    #Merge the staging data into the permanent table
    query = """
    INSERT INTO misc.json_topnews
    SELECT *, CURRENT_TIMESTAMP()
    FROM misc.stage_topnews;
    """

    query_job = client.query(query)
    query_job.result()

    return "Finished loading!"
Cloud scheduler calling

Cloud scheduler is the simple solution to call our integration routine every three hours. The main fields to configure in the cloud scheduler are as follows:

  • Type - HTTP

  • URL - the URL of your cloud function

  • HTTP method - POST

  • Service account

  • Cron schedule


Final result

We are now ready to use our final table for further downstream transformations, or analysis on its own. As an example, we can view all of the headlines that we have captured to date with the following SQL query:

SELECT B.title FROM misc.json_topnews A, UNNEST(articles) B



26 views0 comments

Recent Posts

See All

Comentários


Post: Blog2_Post

Follow

  • Facebook
  • Twitter
  • LinkedIn

©2022 by Tim's BigQuery blog. Proudly created with Wix.com

bottom of page