Skip to content

Commit

Permalink
Adding handling of Nulled lists to beam_row_from_dict (#33830)
Browse files Browse the repository at this point in the history
* Adding handling of Nulled lists to beam_row_from_dict

* Update sdks/python/apache_beam/io/gcp/bigquery_tools.py

Co-authored-by: Ahmed Abualsaud <[email protected]>

---------

Co-authored-by: Ahmed Abualsaud <[email protected]>
  • Loading branch information
TobiasBredow and ahmedabu98 authored Feb 3, 2025
1 parent df13ffe commit c7b2695
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -1606,11 +1606,13 @@ def beam_row_from_dict(row: dict, schema):
# This requires that each row has all the fields in the schema.
# However, it's possible that some nullable fields don't appear in the row.
# For this case, we create the field with a `None` value
if name not in row and mode == "NULLABLE":
# None is also set when a repeated field is missing as BigQuery
# converts Null Repeated fields to empty lists
if name not in row and mode != "REQUIRED":
row[name] = None

value = row[name]
if type in ["RECORD", "STRUCT"]:
if type in ["RECORD", "STRUCT"] and value:
# if this is a list of records, we create a list of Beam Rows
if mode == "REPEATED":
list_of_beam_rows = []
Expand Down
11 changes: 10 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,15 @@ def test_dict_to_beam_row_all_types_repeated(self):
self.assertEqual(expected_beam_row, beam_row_from_dict(dict_row, schema))

def test_dict_to_beam_row_all_types_nullable(self):
schema = {"fields": self.get_schema_fields_with_mode("nullable")}
schema_fields_with_nested = [{
"name": "nested_record",
"type": "record",
"mode": "repeated",
"fields": self.get_schema_fields_with_mode("nullable")
}]
schema_fields_with_nested.extend(
self.get_schema_fields_with_mode("nullable"))
schema = {"fields": schema_fields_with_nested}
dict_row = {k: None for k in self.DICT_ROW}

# input dict row with missing nullable fields should still yield a full
Expand All @@ -876,6 +884,7 @@ def test_dict_to_beam_row_all_types_nullable(self):
del dict_row['bool']

expected_beam_row = beam.Row(
nested_record=None,
str=None,
bool=None,
bytes=None,
Expand Down

0 comments on commit c7b2695

Please sign in to comment.