Conversation with Merlin [email protected] · Thu Dec 21 2023

import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions import os import logging from google.cloud import bigquery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dataplatform.json"

Set up logging configuration

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

class CreateBigQueryTable(beam.DoFn): def process(self, element): project_id, dataset_id, table_id, schema = element client = bigquery.Client()

# Construct a dataset reference dataset_ref = client.dataset(dataset_id, project=project_id) # Construct a table reference table_ref = dataset_ref.table(table_id) # Define the table configuration table_config = bigquery.Table( table_ref, schema=schema, ) # Create the table if it doesn't exist try: client.get_table(table_ref) print(f"Table {table_ref.table_id} already exists.") except Exception: table = client.create_table(table_config) print(f"Table {table.table_id} created in dataset {dataset_id}.")

class WriteToBigQuery(beam.DoFn): def process(self, element, external_schema): # Extract the fields present in the current JSON data json_fields = set(element.keys())

# Create a dynamic schema that includes only the fields present in the JSON data and are in the external schema dynamic_schema = [field for field in external_schema if field.name in json_fields] row_data = {} # Iterate over each field in the dynamic schema for field in dynamic_schema: field_name = field.name # Add the field to the row_data row_data[field_name] = element[field_name] yield bigquery.Row(**row_data)

def run(argv=None): # Define custom options for the pipeline pipeline_options = PipelineOptions(argv)

# Create a Pipeline with the specified options with beam.Pipeline(options=pipeline_options) as p: # Table information project_id = "data-platform-391617" dataset_id = "Customerdata" table_id = "aroh" # Define the external schema with the fields you want to extract external_schema = [ ] # Step 1: Create the BigQuery table create_table_step = ( p | "CreateTableInfo" >> beam.Create([(project_id, dataset_id, table_id, external_schema)]) | "CreateBigQueryTable" >> beam.ParDo(CreateBigQueryTable()) ) # Step 2: Read data and write to BigQuery read_and_write_step = ( p | "ReadFromSource" >> beam.io.ReadFromText("gs://unnati_rest/marketing/newmarketing_refinedata_10:12-10:14-0.json") | "ParseJSON" >> beam.Map(lambda line: eval(line)) # Assuming each line is a JSON object | "WriteToBigQuery" >> beam.ParDo(WriteToBigQuery(), external_schema) ) # Wait for both steps to complete results = {'create_table': create_table_step, 'read_and_write': read_and_write_step}

if name == 'main': run() .... can we create a dynamic schema in big query . since my one json file contains data from multiple datasets.... there will be many rows with different fields... but i want one big query table for these fields .("SalesID", "STRING"), .("CustomerID", "STRING"), .("SalesDate", "STRING"), .("SalesTime", "STRING"), .("SalesLocation", "STRING"), .("WeekDay", "STRING"), .("AveragePrice", "STRING"), .("QuantitySold", "STRING"), .("AverageSpend", "STRING"), .("SalesGrowthRate", "STRING"), .("BudgetID", "STRING"), .("RestaurantID", "STRING"), .("ChainCode", "STRING"), .("ExpenseCategory", "STRING"), .("ExpenseAmount", "STRING"), .("RevenueID", "STRING"), .("RevenueTime", "STRING"), .("TaxAmount", "STRING"), .("Tips", "STRING"), .("TotalSales", "STRING"), .("Discounts", "STRING"), .("Expenses", "STRING"), .("TotalRevenue", "STRING"), ... these are all not present in one json row... but they are present in the json file

In BigQuery, you can create a table with flexible schema using the NULLABLE mode for fields, which allows for the omission of some fields. BigQuery does not support truly dynamic schemas that can be altered on the fly per row to include an arbitrary set of fields from the data, but you can create a schema that encompasses all possible fields that you might have across all of your varied JSON objects, making sure each of those fields is set to be NULLABLE.

If a particular JSON object does not contain data for a nullable field, then those fields can simply be null for that record in the BigQuery table. This way, you can have a single table that supports records with varying sets of fields.

Your external schema in the WriteToBigQuery function would include all possible fields that you expect to encounter in any of your JSON objects. Note that each field in the schema is a big.SchemaField.

For instance, you would define your schema as follows:

from google.cloud import bigquery # Define the schema with all possible fields from all JSON objects external_schema = [ bigquery.SchemaField("SalesID", "STRING", mode="NULLABLE"), bigquery.SchemaField("CustomerID", "STRING", mode="NULLABLE"), bigquery.SchemaField("SalesDate", "STRING mode="NULLABLE"), bigquery.SchemaField("Sales", "STRING", mode="NULLABLE"), ... bigquery.SchemaField("TotalRevenue", "STRING mode="NULLABLE"), ]

When you insert data,'ll construct the row with only the fields present in the incoming JSON object, leaving out any fields that are not present. The WriteToBigQuery process function looks for handling the creation of dynamic rows based on the incoming JSON data.

Just make sure in your step for reading and inserting data in the pipeline, you correctly handle the serialization and deserialization of JSON to avoid any potential security risks with using eval() function. Consider using json.loads() instead for parsing the JSON data from the input file:

import json # ... Other code ... class WriteToBigQuery(beam.DoFn): # ... Your existing process method ... # ... Other code ... read_and_write_step = ( p | "ReadFromSource" >> beam.ioFromText("gs://unnati_rest/marketing/newmarketing_refinedata_10:12-10:14-0.json") | "ParseJSON" >> beam(json.loads) # Safely parse each line as JSON | "WriteToBigQuery" >> beam.ParDo(WriteToBigQuery(),_schema) )

In summary, you can handle BigQuery table schema with fields from multiple datasets by utilizing NULLABLE fields within a unified table schema definition and dynamically omitting fields on a row-by-row basis as the data is inserted.

i have 10 tables of data in json format...each of them have 10 fields ... i only want 5 field names from each table in big query ....

In such a case, you will first need to determine the common set of 5 field names that you want to extract from each of the 10 JSON tables. Once identified, you will define a fixed schema in BigQuery which only includes these fields. Each table may have additional fields, but your BigQuery table will only include the 5 fields you are interested in.

Here's a conceptual example of how you might achieve this in your code:

  1. Define the 5 common fields that you need from each table as your BigQuery schema. For example:
    # Define the external schema with the common fields you want to extract common_schema = [ bigquery.SchemaField("Field1", "STRING", mode="NULLABLE"), bigquery.SchemaField("Field2", "STRING", mode="NULLABLE"), bigquery.SchemaField("Field3", "STRING", mode="NULLABLE"), bigquery.SchemaField("Field4", "STRING", mode="NULLABLE"), bigquery.SchemaField("Field5", "STRING", mode="NULLABLE"), ]
  2. In your pipeline where you parse the JSON and write to BigQuery, you would filter or map the JSON data to only include these 5 fields.

Here's how you might adapt the WriteToBigQuery class:

class WriteToBigQuery(beam.DoFn): def process(self, element, common_schema): row_data = {field.name: element.get(field.name) for field in common_schema} yield bigquery.Row(**row_data) # ... Other code for setting up the pipeline ... # Define common fields to extract common_fields = [ # ... ] read_and_write_step = ( p | "ReadFromSource" >> beam.io.ReadFromText("path_to_your_json_data") | "ParseJSON" >> beam.Map(json.loads) # Safely parse each line as JSON | "FilterFields" >> beam.Map(lambda element: {field: element[field] for field in common_fields if field in element}) | "WriteToBigQuery" >> beam.ParDo(WriteToBigQuery(), common_schema) )

In the FilterFields step, you're using a lambda function to create a dictionary that only includes the common fields that you have defined. If a field is not present in a particular JSON object, it will not be included in the row written to BigQuery, and because the schema fields are defined as NULLABLE, having missing fields in some rows won't cause an issue.

Using this approach will allow you to ingest only the 5 specific fields you want from each of the 10 different JSON tables into a single BigQuery table.

non of the fields are common

If none of the fields are common among the 10 JSON tables and to select 5 specific fields from each table to insert into your BigQuery table, you will have to handle each table individually. For each JSON table, you will need to define a separate process where you extract the desired fields and then write them to BigQuery.

Given this scenario, we assume each JSON table has a completely disjointed set of fields. Here's a step-by-step approach on how you can handle this with Apache Beam for BigQuery loading:

  1. Define Schemas: Define BigQuery table schemas for each table. Since there are no common fields, each schema will be distinct and only include the 5 fields you need from that table.

  2. Create Separate Pipeline Steps: Since each table has different fields, you may need separate pipeline steps to read, transform, and load each type of JSON record to its corresponding BigQuery table.

Here's an abstracted example to give you idea of what the code might look like:

apache_beam as beam from google.cloud import bigquery from apache_beam.options.pipeline_options import PipelineOptions # Initialize options (possibly from command line arguments) pipeline_options = PipelineOptions() # Initialize the Beam pipeline p = beam.Pipeline(options=pipeline_options) # Schemas each table # Assuming each table is named table1, table2, ..., table10 schemas = { table1': 'Field1:STRING,Field2:STRING,...,Field5STRING', # Add schemas for table2, table3, ..., table10 } # Function to extract and transform data for a given table def extract_fields(element, field_names): return {field: element.get(field) for field in field_names if field in element} # Loop over each json file/table, read data, extract the fields, and write to BigQuery for i in range(1, 11): table_name 'table{}'.format(i) json_path = 'path_to_json_table_{}.json'.format(i) # The field_names would be replaced by actual field names from JSON table field_names = ['Field1', '2', 'Field3', 'Field4', 'Field5'] # specify the fields for each table ( p | 'Read{}'.format(table_name) >> beam.io.ReadFromText(json_path) Parse{}'.format(table_name) >> beam.Map(json.loads) | 'ExtractFields{}'.format(table_name) >> beam.Map(extract_fields, field_names | 'WriteToBQ{}'.format(table_name) beam.io.WriteToBigQuery( table_name, schema=schemas[table_name], _disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND ) ) # Run the pipeline result p.run() # Wait until pipeline processing is complete result.wait_until_finish()

In this code piece:

-pipeline_options` would be customized to your specific environment and needs.

  • You need to replace 'Field1', 'Field2', 'Field', 'Field4', 'Field5' with the actual field names from each table for the field_names variable. Each table would have its unique set of field names here.
  • The loop creates ten separate read-transform-write steps in the pipeline, one for each JSON table.

This approach requires you to manually map each table to its respective set of fields. If the fields are completely unique across tables, and you need to maintain separate schemas, there isn't a way to avoid this. Each schema needs to be constructed, and each data load has be treated according to the specific fields of the JSON table it corresponds to.