It's no secret that I prefer simple, cheap, and built-in solutions over costly, complicated and heavy approaches. I've got a few reasons for this: heavier solutions take effort to R&D, gain company approval, gain budgeting approval, and often, lots of training. This investigative work is all time and money that could be spent building more business projects. Simply put, after establishing a sound and intuitive architecture, I prefer to spend more time working on business projects as opposed to getting ready to be able to work on business projects.
BigQuery is one of the ultimate examples of being able to save setup time - it doesn't require as much configuration as Snowflake or Azure Synapse, and the free TB per month of processing power really allows teams to go all out when prototyping solutions. It's a data warehouse with a very low barrier to entry. However, I'm not a 100% code-it-from-scratch-and-save-it-as-a-template person - I prefer dbt over hand-crafted store procedures for data transformations. Still, dbt is much more simplistic to use than traditional ETL tools.
Today's example focuses on a situation for integrating third-party JSON APIs into BigQuery. If it works for you, it's possible to develop a robust integration solution with a little bit of Python, Cloud Functions, and Cloud Scheduler. Granted, this will not necessarily work for creating and managing thousands of discrete ELT routines with complex dependencies, but for newer companies with fewer data sources, or for situations where third party JSON API data is more of a data augmentation, this solution is definitely workable and extendable. The best part is that it doesn't require a separate thrid-party product with its own learning curve, and the code to implement is very concise.
Integrating top news from newsapi.org
The requirements for today's example are as follows:
To facilitate future analysis and machine learning, download and store the top US news stories and integrate into Google BigQuery every three hours.
The solution is streamlined and consists of the following:
Obtain an API key from newsapi.org
Create a Google Cloud Function in Python which downloads the top stories, saves it to a json file and uploads to Google BigQuery
Merge the uploaded/staged data into a permanent table with timestamp
Schedule the Cloud Function to run every three hours
Cloud function setup
Our cloud function will use a simplistic setup:
Environment - 1st gen
Function name - download-top-news
Region - your choice
Trigger type - HTTP
Requires authentication - True
Require HTTPS - True
Timeout - Max (540 seconds)
The source code will use Python 3.10. Be sure that the required package dependencies are added to the requirements.txt file, and that the the entry point is name "download_news."
The code is straightforward - we are accessing the newsapi.org api, and saving the results to a temporary file. The temporary file is then uploaded to a staging table in BigQuery. If a permanent version of the staging table does not exist, it is created. The data is then loaded into the permanent table along with the timestamp. This function could easily be genericized to be callable with arguments for different APIs and targets. Additionally, better exception handling and alerting could be added. However, for brevity, the example is as below:
import wget
import os
from google.cloud import bigquery
def download_news(request):
"""
Script which queries for the top news stories from newsapi.org and loads the articles into BigQuery.
This script is meant to be run as a Cloud Functions job.
"""
#Download the latest headlines from newsapi.org
url = "https://newsapi.org/v2/top-headlines?country=us&apiKey=<<your API key>>"
temp_file = "/tmp/topnews.json"
wget.download(url, out= temp_file)
#Create BigQuery client
client = bigquery.Client()
#Create BigQuery table information
dataset_id = "misc"
table_id = 'stage_topnews'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
job_config.autodetect = True
job_config.write_disposition = "WRITE_TRUNCATE"
#Load the file into BigQuery
with open(temp_file, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
print("Loading file to BigQuery...")
source_file.close()
#Wait for the load to complete
job.result()
#Create a permanent BigQuery table to store the JSON results, if it does not exist
query = """
CREATE TABLE IF NOT EXISTS misc.json_topnews
AS
SELECT *, CURRENT_TIMESTAMP() AS time_loaded
FROM misc.stage_topnews
LIMIT 0;
"""
query_job = client.query(query)
query_job.result()
#Merge the staging data into the permanent table
query = """
INSERT INTO misc.json_topnews
SELECT *, CURRENT_TIMESTAMP()
FROM misc.stage_topnews;
"""
query_job = client.query(query)
query_job.result()
return "Finished loading!"
Cloud scheduler calling
Cloud scheduler is the simple solution to call our integration routine every three hours. The main fields to configure in the cloud scheduler are as follows:
Type - HTTP
URL - the URL of your cloud function
HTTP method - POST
Service account
Cron schedule
Final result
We are now ready to use our final table for further downstream transformations, or analysis on its own. As an example, we can view all of the headlines that we have captured to date with the following SQL query:
SELECT B.title FROM misc.json_topnews A, UNNEST(articles) B
Comentários