Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Use PipelineOptions for constructing BigQueryWrapper when estimating BigQuery table size (#26622) #26662

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ def display_data(self):
}

def estimate_size(self):
bq = bigquery_tools.BigQueryWrapper()
bq = bigquery_tools.BigQueryWrapper.from_pipeline_options(self.options)
if self.table_reference is not None:
table_ref = self.table_reference
if (isinstance(self.table_reference, vp.ValueProvider) and
Expand Down
22 changes: 15 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,7 @@ class BigQueryWrapper(object):
HISTOGRAM_METRIC_LOGGER = MetricLogger()

def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(PipelineOptions()),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
})
self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions())
self.gcp_bq_client = client or gcp_bigquery.Client(
client_info=ClientInfo(
user_agent="apache-beam-%s" % apache_beam.__version__))
Expand Down Expand Up @@ -1350,6 +1344,20 @@ def convert_row_to_dict(self, row, schema):
result[field.name] = self._convert_cell_value_to_dict(value, field)
return result

@staticmethod
def from_pipeline_options(pipeline_options: PipelineOptions):
return BigQueryWrapper(client=BigQueryWrapper._bigquery_client(pipeline_options))

@staticmethod
def _bigquery_client(pipeline_options: PipelineOptions):
return bigquery.BigqueryV2(
http=get_new_http(),
credentials=auth.get_service_credentials(pipeline_options),
response_encoding='utf8',
additional_http_headers={
"user-agent": "apache-beam-%s" % apache_beam.__version__
})


class RowAsDictJsonCoder(coders.Coder):
"""A coder for a table row (represented as a dict) to/from a JSON string.
Expand Down