diff --git a/.travis.yml b/.travis.yml index 9f422c6..ba3cdc8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,7 @@ language: python +before_install: + - sudo apt-get update -q + - sudo apt-get install pypy -y install: - python setup.py develop - pip install tox @@ -7,7 +10,7 @@ notifications: email: false env: - TOXENV=py27 - - TOXENV=py33 - - TOXENV=py34 + - TOXENV=py35 + - TOXENV=py36 - TOXENV=nightly - TOXENV=pypy diff --git a/README.md b/README.md index 2053e05..009f125 100644 --- a/README.md +++ b/README.md @@ -135,7 +135,7 @@ job_id, _ = client.query(query) # Managing Tables -The BigQuery client provides facilities to manage dataset tables, including creating, deleting, and checking the existence of tables. +The BigQuery client provides facilities to manage dataset tables, including creating, deleting, checking the existence, and getting the metadata of tables. ```python # Create a new table. @@ -150,6 +150,10 @@ deleted = client.delete_table('dataset', 'my_table') # Check if a table exists. exists = client.check_table('dataset', 'my_table') + +# Get a table's full metadata. Includes numRows, numBytes, etc. +# See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables +metadata = client.get_table('dataset', 'my_table') ``` There is also functionality for retrieving tables that are associated with a Google App Engine appspot, assuming table names are in the form of appid_YYYY_MM or YYYY_MM_appid. This allows tables between a date range to be selected and queried on. @@ -169,7 +173,7 @@ The client provides an API for inserting data into a BigQuery table. The last pa ```python # Insert data into table. rows = [ - {'one': 'ein', 'two': 'zwei'} + {'one': 'ein', 'two': 'zwei'}, {'id': 'NzAzYmRiY', 'one': 'uno', 'two': 'dos'}, {'id': 'NzAzYmRiY', 'one': 'ein', 'two': 'zwei'} # duplicate entry ] @@ -291,7 +295,7 @@ exists = client.check_dataset('mydataset') ```python from bigquery import schema_from_record -schema_from_record({"id":123, "posts": [{"id":123, "text": "tihs is a post"}], "username": "bob"}) +schema_from_record({"id":123, "posts": [{"id":123, "text": "this is a post"}], "username": "bob"}) ``` # Contributing diff --git a/bigquery/client.py b/bigquery/client.py index 17a3a89..bb4d50a 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -55,7 +55,8 @@ def get_client(project_id=None, credentials=None, service_url=None, service_account=None, private_key=None, private_key_file=None, json_key=None, json_key_file=None, - readonly=True, swallow_results=True): + readonly=True, swallow_results=True, + num_retries=0): """Return a singleton instance of BigQueryClient. Either AssertionCredentials or a service account and private key combination need to be provided in order to authenticate requests to BigQuery. @@ -94,6 +95,9 @@ def get_client(project_id=None, credentials=None, swallow_results : bool If set to False, then return the actual response value instead of converting to boolean. Default True. + num_retries : int, optional + The number of times to retry the request. Default 0 (no retry). + Returns ------- @@ -147,7 +151,8 @@ def get_client(project_id=None, credentials=None, bq_service = _get_bq_service(credentials=credentials, service_url=service_url) - return BigQueryClient(bq_service, project_id, swallow_results) + return BigQueryClient(bq_service, project_id, swallow_results, + num_retries) def get_projects(bq_service): @@ -170,8 +175,13 @@ def _get_bq_service(credentials=None, service_url=None): assert credentials, 'Must provide ServiceAccountCredentials' http = credentials.authorize(Http()) - service = build('bigquery', 'v2', http=http, - discoveryServiceUrl=service_url) + service = build( + 'bigquery', + 'v2', + http=http, + discoveryServiceUrl=service_url, + cache_discovery=False + ) return service @@ -185,17 +195,39 @@ def _credentials(): class BigQueryClient(object): - def __init__(self, bq_service, project_id, swallow_results=True): + def __init__(self, bq_service, project_id, swallow_results=True, + num_retries=0): self.bigquery = bq_service self.project_id = project_id self.swallow_results = swallow_results + self.num_retries = num_retries self.cache = {} + def _get_project_id(self, project_id=None): + """ Get new project_id + + Default is self.project_id, which is the project client authenticate to. + A new project_id is specified when client wants to authenticate to 1 project, + but run jobs in a different project. + + Parameters + ---------- + project_id : str + BigQuery project_id + + Returns + ------- + project_id: BigQuery project_id + """ + if project_id is None: + project_id = self.project_id + return project_id + def _submit_query_job(self, query_data): """ Submit a query job to BigQuery. This is similar to BigQueryClient.query, but gives the user - direct access to the query method on the offical BigQuery + direct access to the query method on the official BigQuery python client. For fine-grained control over a query job, see: @@ -211,7 +243,7 @@ def _submit_query_job(self, query_data): ------- tuple job id and query results if query completed. If dry_run is True, - job id will be None and results will be empty if the query is valid + job id will be None and results will be [cacheHit and totalBytesProcessed] if the query is valid or a dict containing the response if invalid. Raises @@ -226,7 +258,8 @@ def _submit_query_job(self, query_data): try: query_reply = job_collection.query( - projectId=self.project_id, body=query_data).execute() + projectId=self.project_id, body=query_data).execute( + num_retries=self.num_retries) except HttpError as e: if query_data.get("dryRun", False): return None, json.loads(e.content.decode('utf8')) @@ -236,19 +269,44 @@ def _submit_query_job(self, query_data): schema = query_reply.get('schema', {'fields': None})['fields'] rows = query_reply.get('rows', []) job_complete = query_reply.get('jobComplete', False) + cache_hit = query_reply['cacheHit'] + total_bytes_processed = query_reply['totalBytesProcessed'] # raise exceptions if it's not an async query # and job is not completed after timeout if not job_complete and query_data.get("timeoutMs", False): logger.error('BigQuery job %s timeout' % job_id) raise BigQueryTimeoutException() - + + if query_data.get("dryRun", True): + return job_id, [cache_hit, total_bytes_processed] return job_id, [self._transform_row(row, schema) for row in rows] + def _get_job_reference(self, job_id): + """ Get job reference from job_id + For more details, see: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#resource + + Parameters + ---------- + job_id: + Id of the job + + Returns + ------- + job_reference: json of job_reference + """ + job_reference = { + "projectId": self.project_id, + "jobId": job_id + } + + return job_reference + def _insert_job(self, body_object): """ Submit a job to BigQuery - Direct proxy to the insert() method of the offical BigQuery + Direct proxy to the insert() method of the official BigQuery python client. Able to submit load, link, query, copy, or extract jobs. @@ -276,7 +334,7 @@ def _insert_job(self, body_object): return job_collection.insert( projectId=self.project_id, body=body_object - ).execute() + ).execute(num_retries=self.num_retries) def query(self, query, max_results=None, timeout=0, dry_run=False, use_legacy_sql=None, external_udf_uris=None): """Submit a query to BigQuery. @@ -291,8 +349,8 @@ def query(self, query, max_results=None, timeout=0, dry_run=False, use_legacy_sq How long to wait for the query to complete, in seconds before the request times out and returns. dry_run : bool, optional - If True, the query isn't actually run. A valid query will return an - empty response, while an invalid one will return the same error + If True, the query isn't actually run. A valid query will return + cache hit, and total bytes processed, while an invalid one will return the same error message it would if it wasn't a dry run. use_legacy_sql : bool, optional. Default True. If False, the query will use BigQuery's standard SQL (https://cloud.google.com/bigquery/sql-reference/) @@ -305,7 +363,7 @@ def query(self, query, max_results=None, timeout=0, dry_run=False, use_legacy_sq ------- tuple (job id, query results) if the query completed. If dry_run is True, - job id will be None and results will be empty if the query is valid + job id will be None and results will be [cacheHit and totalBytesProcessed] if the query is valid or a ``dict`` containing the response if invalid. Raises @@ -354,7 +412,7 @@ def get_query_schema(self, job_id): return query_reply['schema']['fields'] - def get_table_schema(self, dataset, table): + def get_table_schema(self, dataset, table, project_id=None): """Return the table schema. Parameters @@ -363,6 +421,8 @@ def get_table_schema(self, dataset, table): The dataset containing the `table`. table : str The table to get the schema for + project_id: str, optional + The project of the dataset. Returns ------- @@ -370,12 +430,13 @@ def get_table_schema(self, dataset, table): A ``list`` of ``dict`` objects that represent the table schema. If the table doesn't exist, None is returned. """ + project_id = self._get_project_id(project_id) - try: + try: result = self.bigquery.tables().get( - projectId=self.project_id, + projectId=project_id, tableId=table, - datasetId=dataset).execute() + datasetId=dataset).execute(num_retries=self.num_retries) except HttpError as e: if int(e.resp['status']) == 404: logger.warn('Table %s.%s does not exist', dataset, table) @@ -450,44 +511,51 @@ def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): records += [self._transform_row(row, schema) for row in rows] return records[:limit] if limit else records - def check_dataset(self, dataset_id): + def check_dataset(self, dataset_id, project_id=None): """Check to see if a dataset exists. Parameters ---------- dataset_id : str Dataset unique id + project_id: str, optional + The project the dataset is in Returns ------- bool True if dataset at `dataset_id` exists, else Fasle - """ - dataset = self.get_dataset(dataset_id) + """ + dataset = self.get_dataset(dataset_id, project_id) return bool(dataset) - def get_dataset(self, dataset_id): + def get_dataset(self, dataset_id, project_id=None): """Retrieve a dataset if it exists, otherwise return an empty dict. Parameters ---------- dataset_id : str Dataset unique id + project_id: str, optional + The project the dataset is in Returns ------- dict Contains dataset object if it exists, else empty """ - try: + project_id = self._get_project_id(project_id) + + try: dataset = self.bigquery.datasets().get( - projectId=self.project_id, datasetId=dataset_id).execute() + projectId=project_id, datasetId=dataset_id).execute( + num_retries=self.num_retries) except HttpError: dataset = {} return dataset - def check_table(self, dataset, table): + def check_table(self, dataset, table, project_id=None): """Check to see if a table exists. Parameters @@ -496,16 +564,18 @@ def check_table(self, dataset, table): The dataset to check table : str The name of the table + project_id: str, optional + The project the table is in Returns ------- bool True if table exists, else False """ - table = self.get_table(dataset, table) + table = self.get_table(dataset, table, project_id) return bool(table) - def get_table(self, dataset, table): + def get_table(self, dataset, table, project_id=None): """ Retrieve a table if it exists, otherwise return an empty dict. Parameters @@ -514,23 +584,27 @@ def get_table(self, dataset, table): The dataset that the table is in table : str The name of the table + project_id: str, optional + The project that the table is in Returns ------- dict Containing the table object if it exists, else empty """ - try: + project_id = self._get_project_id(project_id) + try: table = self.bigquery.tables().get( - projectId=self.project_id, datasetId=dataset, - tableId=table).execute() + projectId=project_id, datasetId=dataset, + tableId=table).execute(num_retries=self.num_retries) except HttpError: table = {} return table def create_table(self, dataset, table, schema, - expiration_time=None, time_partitioning=False): + expiration_time=None, time_partitioning=False, + project_id=None): """Create a new table in the dataset. Parameters @@ -540,11 +614,13 @@ def create_table(self, dataset, table, schema, table : str The name of the table to create schema : dict - The table schema - expiration_time : float, optional + The table schema + expiration_time : int or double, optional The expiry time in milliseconds since the epoch. time_partitioning : bool, optional Create a time partitioning. + project_id: str, optional + The project to create the table in Returns ------- @@ -552,12 +628,13 @@ def create_table(self, dataset, table, schema, If the table was successfully created, or response from BigQuery if swallow_results is set to False """ + project_id = self._get_project_id(project_id) body = { 'schema': {'fields': schema}, 'tableReference': { 'tableId': table, - 'projectId': self.project_id, + 'projectId': project_id, 'datasetId': dataset } } @@ -570,24 +647,24 @@ def create_table(self, dataset, table, schema, try: table = self.bigquery.tables().insert( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return table except HttpError as e: - logger.error(('Cannot create table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, e.content)) + logger.error(('Cannot create table {0}.{1}.{2}\n' + 'Http Error: {3}').format(project_id, dataset, table, e.content)) if self.swallow_results: return False else: return {} - def update_table(self, dataset, table, schema): + def update_table(self, dataset, table, schema, project_id=None): """Update an existing table in the dataset. Parameters @@ -598,6 +675,8 @@ def update_table(self, dataset, table, schema): The name of the table to update schema : dict Table schema + project_id: str, optional + The project to update the table in Returns ------- @@ -605,36 +684,38 @@ def update_table(self, dataset, table, schema): bool indicating if the table was successfully updated or not, or response from BigQuery if swallow_results is set to False. """ + project_id = self._get_project_id(project_id) body = { 'schema': {'fields': schema}, 'tableReference': { 'tableId': table, - 'projectId': self.project_id, + 'projectId': project_id, 'datasetId': dataset } } try: result = self.bigquery.tables().update( - projectId=self.project_id, + projectId=project_id, + tableId= table, datasetId=dataset, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return result except HttpError as e: - logger.error(('Cannot update table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, e.content)) + logger.error(('Cannot update table {0}.{1}.{2}\n' + 'Http Error: {3}').format(project_id, dataset, table, e.content)) if self.swallow_results: return False else: return {} - def patch_table(self, dataset, table, schema): + def patch_table(self, dataset, table, schema, project_id=None): """Patch an existing table in the dataset. Parameters @@ -645,6 +726,8 @@ def patch_table(self, dataset, table, schema): The name of the table to patch schema : dict The table schema + project_id: str, optional + The project to patch the table in Returns ------- @@ -652,36 +735,33 @@ def patch_table(self, dataset, table, schema): Bool indicating if the table was successfully patched or not, or response from BigQuery if swallow_results is set to False """ + project_id = self._get_project_id(project_id) body = { 'schema': {'fields': schema}, - 'tableReference': { - 'tableId': table, - 'projectId': self.project_id, - 'datasetId': dataset - } } try: result = self.bigquery.tables().patch( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, + tableId=table, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return result except HttpError as e: - logger.error(('Cannot patch table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, e.content)) + logger.error(('Cannot patch table {0}.{1}.{2}\n' + 'Http Error: {3}').format(project_id, dataset, table, e.content)) if self.swallow_results: return False else: return {} - def create_view(self, dataset, view, query, use_legacy_sql=None): + def create_view(self, dataset, view, query, use_legacy_sql=None, project_id=None): """Create a new view in the dataset. Parameters @@ -691,10 +771,12 @@ def create_view(self, dataset, view, query, use_legacy_sql=None): view : str The name of the view to create query : dict - A query that BigQuery executes when the view is referenced. + A query that BigQuery executes when the view is referenced. use_legacy_sql : bool, optional If False, the query will use BigQuery's standard SQL (https://cloud.google.com/bigquery/sql-reference/) + project_id: str, optional + The project to create the view in Returns ------- @@ -702,11 +784,12 @@ def create_view(self, dataset, view, query, use_legacy_sql=None): bool indicating if the view was successfully created or not, or response from BigQuery if swallow_results is set to False. """ + project_id = self._get_project_id(project_id) body = { 'tableReference': { 'tableId': view, - 'projectId': self.project_id, + 'projectId': project_id, 'datasetId': dataset }, 'view': { @@ -719,10 +802,10 @@ def create_view(self, dataset, view, query, use_legacy_sql=None): try: view = self.bigquery.tables().insert( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: @@ -736,7 +819,7 @@ def create_view(self, dataset, view, query, use_legacy_sql=None): else: return {} - def delete_table(self, dataset, table): + def delete_table(self, dataset, table, project_id=None): """Delete a table from the dataset. Parameters @@ -745,6 +828,8 @@ def delete_table(self, dataset, table): The dataset to delete the table from. table : str The name of the table to delete + project_id: str, optional + String id of the project Returns ------- @@ -752,13 +837,14 @@ def delete_table(self, dataset, table): bool indicating if the table was successfully deleted or not, or response from BigQuery if swallow_results is set for False. """ + project_id = self._get_project_id(project_id) - try: + try: response = self.bigquery.tables().delete( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, tableId=table - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: @@ -772,7 +858,7 @@ def delete_table(self, dataset, table): else: return {} - def get_tables(self, dataset_id, app_id, start_time, end_time): + def get_tables(self, dataset_id, app_id, start_time, end_time, project_id=None): """Retrieve a list of tables that are related to the given app id and are inside the range of start and end times. @@ -786,6 +872,8 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): The datetime or unix time after which records will be fetched. end_time : Union[datetime, int] The datetime or unix time up to which records will be fetched. + project_id: str, optional + String id of the project Returns ------- @@ -799,7 +887,7 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): if isinstance(end_time, datetime): end_time = calendar.timegm(end_time.utctimetuple()) - every_table = self._get_all_tables(dataset_id) + every_table = self._get_all_tables(dataset_id, project_id) app_tables = every_table.get(app_id, {}) return self._filter_tables_by_time(app_tables, start_time, end_time) @@ -809,7 +897,7 @@ def import_data_from_uris( source_uris, dataset, table, - schema=None, + schema=None, job=None, source_format=None, create_disposition=None, @@ -822,6 +910,7 @@ def import_data_from_uris( field_delimiter=None, quote=None, skip_leading_rows=None, + project_id=None, ): """ Imports data into a BigQuery table from cloud storage. Optional @@ -838,11 +927,11 @@ def import_data_from_uris( String id of the dataset table : str String id of the table + schema : list, optional + Represents the BigQuery schema job : str, optional Identifies the job (a unique job id is automatically generated if - not provided) - schema : list, optional - Represents the BigQuery schema + not provided) source_format : str, optional One of the JOB_SOURCE_FORMAT_* constants create_disposition : str, optional @@ -865,6 +954,8 @@ def import_data_from_uris( Quote character for csv only skip_leading_rows : int, optional For csv only + project_id: str, optional + String id of the project Returns ------- @@ -879,9 +970,11 @@ def import_data_from_uris( source_uris = source_uris if isinstance(source_uris, list) \ else [source_uris] + project_id = self._get_project_id(project_id) + configuration = { "destinationTable": { - "projectId": self.project_id, + "projectId": project_id, "tableId": table, "datasetId": dataset }, @@ -953,10 +1046,7 @@ def import_data_from_uris( "configuration": { 'load': configuration }, - "jobReference": { - "projectId": self.project_id, - "jobId": job - } + "jobReference": self._get_job_reference(job) } logger.debug("Creating load job %s" % body) @@ -968,12 +1058,13 @@ def export_data_to_uris( self, destination_uris, dataset, - table, + table, job=None, compression=None, destination_format=None, print_header=None, field_delimiter=None, + project_id=None, ): """ Export data from a BigQuery table to cloud storage. Optional arguments @@ -982,13 +1073,13 @@ def export_data_to_uris( Parameters ---------- - destination_urls : Union[str, list] + destination_uris : Union[str, list] ``str`` or ``list`` of ``str`` objects representing the URIs on cloud storage of the form: gs://bucket/filename dataset : str String id of the dataset table : str - String id of the table + String id of the table job : str, optional String identifying the job (a unique jobid is automatically generated if not provided) @@ -1000,6 +1091,8 @@ def export_data_to_uris( Whether or not to print the header field_delimiter : str, optional Character separating fields in delimited file + project_id: str, optional + String id of the project Returns ------- @@ -1014,9 +1107,11 @@ def export_data_to_uris( destination_uris = destination_uris \ if isinstance(destination_uris, list) else [destination_uris] + project_id = self._get_project_id(project_id) + configuration = { "sourceTable": { - "projectId": self.project_id, + "projectId": project_id, "tableId": table, "datasetId": dataset }, @@ -1047,10 +1142,7 @@ def export_data_to_uris( "configuration": { 'extract': configuration }, - "jobReference": { - "projectId": self.project_id, - "jobId": job - } + "jobReference": self._get_job_reference(job) } logger.info("Creating export job %s" % body) @@ -1062,7 +1154,7 @@ def write_to_table( self, query, dataset=None, - table=None, + table=None, external_udf_uris=None, allow_large_results=None, use_query_cache=None, @@ -1071,7 +1163,8 @@ def write_to_table( write_disposition=None, use_legacy_sql=None, maximum_billing_tier=None, - flatten=None + flatten=None, + project_id=None, ): """ Write query result to table. If dataset or table is not provided, @@ -1086,7 +1179,7 @@ def write_to_table( dataset : str, optional String id of the dataset table : str, optional - String id of the table + String id of the table external_udf_uris : list, optional Contains external UDF URIs. If given, URIs must be Google Cloud Storage and have .js extensions. @@ -1112,6 +1205,8 @@ def write_to_table( flatten : bool, optional Whether or not to flatten nested and repeated fields in query results + project_id: str, optional + String id of the project Returns ------- @@ -1128,9 +1223,11 @@ def write_to_table( "query": query, } + project_id = self._get_project_id(project_id) + if dataset and table: configuration['destinationTable'] = { - "projectId": self.project_id, + "projectId": project_id, "tableId": table, "datasetId": dataset } @@ -1211,7 +1308,7 @@ def wait_for_job(self, job, interval=5, timeout=60): sleep(interval) request = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id) - job_resource = request.execute() + job_resource = request.execute(num_retries=self.num_retries) self._raise_executing_exception_if_error(job_resource) complete = job_resource.get('status').get('state') == u'DONE' elapsed_time = time() - start_time @@ -1225,7 +1322,7 @@ def wait_for_job(self, job, interval=5, timeout=60): def push_rows(self, dataset, table, rows, insert_id_key=None, skip_invalid_rows=None, ignore_unknown_values=None, - template_suffix=None): + template_suffix=None, project_id=None): """Upload rows to BigQuery table. Parameters @@ -1233,7 +1330,7 @@ def push_rows(self, dataset, table, rows, insert_id_key=None, dataset : str The dataset to upload to table : str - The name of the table to insert rows into + The name of the table to insert rows into rows : list A ``list`` of rows (``dict`` objects) to add to the table insert_id_key : str, optional @@ -1246,6 +1343,8 @@ def push_rows(self, dataset, table, rows, insert_id_key=None, template_suffix : str, optional Inserts the rows into an {table}{template_suffix}. If table {table}{template_suffix} doesn't exist, create from {table}. + project_id: str, optional + The project to upload to Returns ------- @@ -1253,7 +1352,7 @@ def push_rows(self, dataset, table, rows, insert_id_key=None, bool indicating if insert succeeded or not, or response from BigQuery if swallow_results is set for False. """ - + project_id = self._get_project_id(project_id) table_data = self.bigquery.tabledata() rows_data = [] @@ -1281,13 +1380,13 @@ def push_rows(self, dataset, table, rows, insert_id_key=None, if template_suffix is not None: data['templateSuffix'] = template_suffix - try: + try: response = table_data.insertAll( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, tableId=table, body=data - ).execute() + ).execute(num_retries=self.num_retries) if response.get('insertErrors'): logger.error('BigQuery insert errors: %s' % response) @@ -1315,19 +1414,21 @@ def push_rows(self, dataset, table, rows, insert_id_key=None, }] } - def get_all_tables(self, dataset_id): + def get_all_tables(self, dataset_id, project_id=None): """Retrieve a list of tables for the dataset. Parameters ---------- dataset_id : str The dataset to retrieve table data for. + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- A ``list`` with all table names """ - tables_data = self._get_all_tables_for_dataset(dataset_id) + tables_data = self._get_all_tables_for_dataset(dataset_id, project_id) tables = [] for table in tables_data.get('tables', []): @@ -1336,7 +1437,7 @@ def get_all_tables(self, dataset_id): tables.append(table_name) return tables - def _get_all_tables(self, dataset_id, cache=False): + def _get_all_tables(self, dataset_id, cache=False, project_id=None): """Retrieve the list of tables for dataset, that respect the formats: * appid_YYYY_MM * YYYY_MM_appid @@ -1344,10 +1445,12 @@ def _get_all_tables(self, dataset_id, cache=False): Parameters ---------- dataset_id : str - The dataset to retrieve table names for + The dataset to retrieve table names for cache : bool, optional To use cached value or not (default False). Timeout value equals CACHE_TIMEOUT. + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1361,35 +1464,39 @@ def _get_all_tables(self, dataset_id, cache=False): do_fetch = False if do_fetch: - result = self._get_all_tables_for_dataset(dataset_id) + result = self._get_all_tables_for_dataset(dataset_id, project_id) self.cache[dataset_id] = (datetime.now(), result) return self._parse_table_list_response(result) - def _get_all_tables_for_dataset(self, dataset_id): + def _get_all_tables_for_dataset(self, dataset_id, project_id=None): """Retrieve a list of all tables for the dataset. Parameters ---------- dataset_id : str The dataset to retrieve table names for + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- dict A ``dict`` containing tables key with all tables """ + project_id = self._get_project_id(project_id) + result = self.bigquery.tables().list( - projectId=self.project_id, - datasetId=dataset_id).execute() + projectId=project_id, + datasetId=dataset_id).execute(num_retries=self.num_retries) page_token = result.get('nextPageToken') while page_token: res = self.bigquery.tables().list( - projectId=self.project_id, + projectId=project_id, datasetId=dataset_id, pageToken=page_token - ).execute() + ).execute(num_retries=self.num_retries) page_token = res.get('nextPageToken') result['tables'] += res.get('tables', []) return result @@ -1436,6 +1543,8 @@ def _parse_table_name(self, table_id): """Parse a table name in the form of appid_YYYY_MM or YYYY_MM_appid and return a tuple consisting of YYYY-MM and the app id. + Returns (None, None) in the event of a name like _YYYYMMDD_ + Parameters ---------- table_id : str @@ -1463,9 +1572,10 @@ def _parse_table_name(self, table_id): year_month = "-".join(attributes[-2:]) app_id = "-".join(attributes[:-2]) + # Check if date parsed correctly if year_month.count("-") == 1 and all( - [num.isdigit() for num in year_month.split('-')]): + [num.isdigit() for num in year_month.split('-')]) and len(year_month) == 7: return year_month, app_id return None, None @@ -1549,7 +1659,7 @@ def get_query_results(self, job_id, offset=None, limit=None, startIndex=offset, maxResults=limit, pageToken=page_token, - timeoutMs=timeout * 1000).execute() + timeoutMs=timeout * 1000).execute(num_retries=self.num_retries) def _transform_row(self, row, schema): """Apply the given schema to the given BigQuery data row. @@ -1670,14 +1780,14 @@ def _raise_executing_exception_if_error(self, job): # DataSet manipulation methods # def create_dataset(self, dataset_id, friendly_name=None, description=None, - access=None, location=None): + access=None, location=None, project_id=None): """Create a new BigQuery dataset. Parameters ---------- dataset_id : str Unique ``str`` identifying the dataset with the project (the - referenceID of the dataset, not the integer id of the dataset) + referenceID of the dataset, not the integer id of the dataset) friendly_name: str, optional A human readable name description: str, optional @@ -1688,6 +1798,8 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, location : str, optional Indicating where dataset should be stored: EU or US (see https://developers.google.com/bigquery/docs/reference/v2/datasets#resource) + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1695,16 +1807,21 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, ``bool`` indicating if dataset was created or not, or response from BigQuery if swallow_results is set for False """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - dataset_data = self.dataset_resource(dataset_id, + dataset_data = self.dataset_resource(dataset_id, + project_id=project_id, friendly_name=friendly_name, description=description, access=access, - location=location) + location=location + ) - response = datasets.insert(projectId=self.project_id, - body=dataset_data).execute() + response = datasets.insert(projectId=project_id, + body=dataset_data).execute( + num_retries=self.num_retries) if self.swallow_results: return True else: @@ -1717,34 +1834,43 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, else: return {} - def get_datasets(self): + def get_datasets(self, project_id=None): """List all datasets in the project. + + Parameters + ---------- + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- list Dataset resources """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - request = datasets.list(projectId=self.project_id) - result = request.execute() + request = datasets.list(projectId=project_id) + result = request.execute(num_retries=self.num_retries) return result.get('datasets', []) except HttpError as e: logger.error("Cannot list datasets: {0}".format(e)) return None - def delete_dataset(self, dataset_id, delete_contents=False): + def delete_dataset(self, dataset_id, delete_contents=False, project_id=None): """Delete a BigQuery dataset. Parameters ---------- dataset_id : str - Unique ``str`` identifying the datset with the project (the - referenceId of the dataset) + Unique ``str`` identifying the dataset with the project (the + referenceId of the dataset) + Unique ``str`` identifying the BigQuery project contains the dataset delete_contents : bool, optional If True, forces the deletion of the dataset even when the dataset contains data (Default = False) + project_id: str, optional Returns ------- @@ -1757,12 +1883,14 @@ def delete_dataset(self, dataset_id, delete_contents=False): HttpError 404 when dataset with dataset_id does not exist """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - request = datasets.delete(projectId=self.project_id, + request = datasets.delete(projectId=project_id, datasetId=dataset_id, deleteContents=delete_contents) - response = request.execute() + response = request.execute(num_retries=self.num_retries) if self.swallow_results: return True else: @@ -1776,7 +1904,7 @@ def delete_dataset(self, dataset_id, delete_contents=False): return {} def update_dataset(self, dataset_id, friendly_name=None, description=None, - access=None): + access=None, project_id=None): """Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource. @@ -1785,13 +1913,15 @@ def update_dataset(self, dataset_id, friendly_name=None, description=None, ---------- dataset_id : str Unique ``str`` identifying the dataset with the project (the - referencedId of the dataset) + referencedId of the dataset) friendly_name : str, optional An optional descriptive name for the dataset. description : str, optional An optional description of the dataset. access : list, optional Indicating access permissions + project_id: str, optional + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1799,14 +1929,20 @@ def update_dataset(self, dataset_id, friendly_name=None, description=None, ``bool`` indicating if the update was successful or not, or response from BigQuery if swallow_results is set for False. """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - body = self.dataset_resource(dataset_id, friendly_name, - description, access) - request = datasets.update(projectId=self.project_id, + body = self.dataset_resource(dataset_id, + friendly_name=friendly_name, + description=description, + access=access, + project_id=project_id) + + request = datasets.update(projectId=project_id, datasetId=dataset_id, body=body) - response = request.execute() + response = request.execute(num_retries=self.num_retries) if self.swallow_results: return True else: @@ -1820,7 +1956,7 @@ def update_dataset(self, dataset_id, friendly_name=None, description=None, return {} def patch_dataset(self, dataset_id, friendly_name=None, description=None, - access=None): + access=None, project_id=None): """Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource. @@ -1829,13 +1965,15 @@ def patch_dataset(self, dataset_id, friendly_name=None, description=None, ---------- dataset_id : str Unique string idenfitying the dataset with the project (the - referenceId of the dataset) + referenceId of the dataset) friendly_name : str, optional An optional descriptive name for the dataset. description : str, optional An optional description of the dataset. access : list, optional Indicating access permissions. + project_id: str, optional + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1843,13 +1981,18 @@ def patch_dataset(self, dataset_id, friendly_name=None, description=None, ``bool`` indicating if the patch was successful or not, or response from BigQuery if swallow_results is set for False. """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - body = self.dataset_resource(dataset_id, friendly_name, - description, access) - request = datasets.patch(projectId=self.project_id, + body = self.dataset_resource(dataset_id, + friendly_name=friendly_name, + description=description, + access=access, + project_id=project_id) + request = datasets.patch(projectId=project_id, datasetId=dataset_id, body=body) - response = request.execute() + response = request.execute(num_retries=self.num_retries) if self.swallow_results: return True else: @@ -1862,14 +2005,14 @@ def patch_dataset(self, dataset_id, friendly_name=None, description=None, return {} def dataset_resource(self, ref_id, friendly_name=None, description=None, - access=None, location=None): + access=None, location=None, project_id=None): """See https://developers.google.com/bigquery/docs/reference/v2/datasets#resource Parameters ---------- ref_id : str - Dataset id (the reference id, not the integer id) + Dataset id (the reference id, not the integer id) friendly_name : str, optional An optional descriptive name for the dataset description : str, optional @@ -1878,16 +2021,20 @@ def dataset_resource(self, ref_id, friendly_name=None, description=None, Indicating access permissions location: str, optional, 'EU' or 'US' An optional geographical location for the dataset(EU or US) + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- dict Representing BigQuery dataset resource """ + project_id = self._get_project_id(project_id) + data = { "datasetReference": { "datasetId": ref_id, - "projectId": self.project_id + "projectId": project_id } } if friendly_name: diff --git a/bigquery/query_builder.py b/bigquery/query_builder.py index 1054299..435bb73 100644 --- a/bigquery/query_builder.py +++ b/bigquery/query_builder.py @@ -29,7 +29,7 @@ def render_query(dataset, tables, select=None, conditions=None, 'comparators' maps to another ``dict`` containing the keys 'condition', 'negate', and 'value'. If 'comparators' = {'condition': '>=', 'negate': False, 'value': 1}, - this example will be rdnered as 'foo >= FLOAT('1')' in the query. + this example will be rendered as 'foo >= FLOAT('1')' in the query. ``list`` of field names to group by order_by : dict, optional Keys = {'field', 'direction'}. `dict` should be formatted as @@ -170,7 +170,7 @@ def _render_conditions(conditions): Parameters ---------- conditions : list - A list of dictionay items to filter a table. + A list of dictionary items to filter a table. Returns ------- @@ -241,6 +241,8 @@ def _render_condition(field, field_type, comparators): else: value = _render_condition_value(value, field_type) value = "(" + value + ")" + elif condition == "IS NULL" or condition == "IS NOT NULL": + return field + " " + condition elif condition == "BETWEEN": if isinstance(value, (tuple, list, set)) and len(value) == 2: value = ' AND '.join( diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index 1315147..1f2d247 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -67,8 +67,13 @@ def test_initialize_readonly(self, mock_build, mock_return_cred): scopes=BIGQUERY_SCOPE_READ_ONLY) self.assertTrue( mock_cred.from_p12_keyfile_buffer.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @@ -101,8 +106,13 @@ def test_initialize_read_write(self, mock_build, mock_return_cred): service_account, mock.ANY, scopes=BIGQUERY_SCOPE) self.assertTrue( mock_cred.from_p12_keyfile_buffer.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @@ -136,8 +146,13 @@ def test_initialize_key_file(self, mock_build, mock_return_cred): scopes=BIGQUERY_SCOPE) self.assertTrue( mock_cred.from_p12_keyfile.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @@ -172,8 +187,13 @@ def test_initialize_json_key_file(self, mock_open, mock_build, mock_return_cred) scopes=BIGQUERY_SCOPE) self.assertTrue( mock_cred.from_json_keyfile_dict.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @@ -208,8 +228,13 @@ def test_initialize_json_key_file_without_project_id(self, mock_open, mock_build scopes=BIGQUERY_SCOPE) self.assertTrue( mock_cred.from_json_keyfile_dict.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(json_key['project_id'], bq_client.project_id) @@ -272,7 +297,9 @@ def test_query(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, - 'jobComplete': True + 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -292,7 +319,6 @@ def test_query(self): self.assertEquals(job_id, 'spiderman') self.assertEquals(results, []) - def test_query_max_results_set(self): """Ensure that we retrieve the job id from the query and the maxResults parameter is set. @@ -305,6 +331,8 @@ def test_query_max_results_set(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -333,6 +361,8 @@ def test_query_timeout_set(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -358,6 +388,8 @@ def test_sync_query_timeout(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': False, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -376,6 +408,8 @@ def test_async_query_timeout(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': False, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -385,14 +419,18 @@ def test_async_query_timeout(self): self.assertEquals(results, []) def test_query_dry_run_valid(self): - """Ensure that None and an empty list is returned from the query when + """Ensure that None and [cacheHit, totalBytesProcessed] is returned from the query when dry_run is True and the query is valid. """ mock_query_job = mock.Mock() - mock_query_job.execute.return_value = {'jobReference': {}, - 'jobComplete': True} + mock_query_job.execute.return_value = { + 'jobReference': {}, + 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 + } self.mock_job_collection.query.return_value = mock_query_job @@ -404,7 +442,7 @@ def test_query_dry_run_valid(self): 'dryRun': True} ) self.assertIsNone(job_id) - self.assertEqual([], results) + self.assertEqual([False, 0], results) def test_query_dry_run_invalid(self): """Ensure that None and a dict is returned from the query when dry_run @@ -444,6 +482,8 @@ def test_query_with_results(self): 'schema': {'fields': [{'name': 'foo', 'type': 'INTEGER'}]}, 'rows': [{'f': [{'v': 10}]}], 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -467,7 +507,9 @@ def test_query_with_using_legacy_sql(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, - 'jobComplete': True + 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -520,7 +562,7 @@ def test_get_response(self): projectId=self.project_id, jobId=job_id, startIndex=offset, maxResults=limit, pageToken=page_token, timeoutMs=1000) - mock_query_job.execute.assert_called_once_with() + mock_query_job.execute.assert_called_once_with(num_retries=0) self.assertEquals(actual, mock_query_reply) @@ -849,7 +891,7 @@ def test_json_job_body_constructed_correctly(self): body = { "jobReference": { "projectId": self.project_id, - "jobId": "job" + "jobId": "job", }, "configuration": { "load": { @@ -1333,6 +1375,15 @@ def test_not_inside_range(self): "kind": "bigquery#tableList", "etag": "\"GSclnjk0zID1ucM3F-xYinOm1oE/cn58Rpu8v8pB4eoJQaiTe11lPQc\"", "tables": [ + { + "kind": "bigquery#table", + "id": "project:dataset.notanappspottable_20130515_0261", + "tableReference": { + "projectId": "project", + "datasetId": "dataset", + "tableId": "notanappspottable_20130515_0261" + } + }, { "kind": "bigquery#table", "id": "project:dataset.2013_05_appspot_1", @@ -1476,7 +1527,8 @@ def test_table_exists(self): expected, self.client.get_table_schema(self.dataset, self.table)) self.mock_tables.get.assert_called_once_with( projectId=self.project, tableId=self.table, datasetId=self.dataset) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) def test_table_does_not_exist(self): """Ensure that None is returned if the table doesn't exist.""" @@ -1487,7 +1539,8 @@ def test_table_does_not_exist(self): self.client.get_table_schema(self.dataset, self.table)) self.mock_tables.get.assert_called_once_with( projectId=self.project, tableId=self.table, datasetId=self.dataset) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) @mock.patch('bigquery.client.BigQueryClient.get_query_results') @@ -1642,7 +1695,8 @@ def test_table_does_not_exist(self): self.mock_tables.get.assert_called_once_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) def test_table_does_exist(self): """Ensure that if the table does exist, True is returned.""" @@ -1657,7 +1711,8 @@ def test_table_does_exist(self): self.mock_tables.get.assert_called_once_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) class TestCreateTable(unittest.TestCase): @@ -1707,7 +1762,8 @@ def test_table_create_failed(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_create_success(self): """Ensure that if creating the table succeeds, True is returned, @@ -1733,7 +1789,8 @@ def test_table_create_success(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_create_body_with_expiration_time(self): """Ensure that if expiration_time has specified, @@ -1753,7 +1810,8 @@ def test_table_create_body_with_expiration_time(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_create_body_with_time_partitioning(self): """Ensure that if time_partitioning has specified, @@ -1774,7 +1832,8 @@ def test_table_create_body_with_time_partitioning(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) class TestUpdateTable(unittest.TestCase): @@ -1821,9 +1880,11 @@ def test_table_update_failed(self): self.client.swallow_results = True self.mock_tables.update.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, tableId=self.table, datasetId=self.dataset, + body=self.body) - self.mock_tables.update.return_value.execute.assert_called_with() + self.mock_tables.update.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_update_success(self): """Ensure that if updating the table succeeds, True is returned, @@ -1847,9 +1908,11 @@ def test_table_update_success(self): self.client.swallow_results = True self.mock_tables.update.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, tableId=self.table, datasetId=self.dataset, + body=self.body) - self.mock_tables.update.return_value.execute.assert_called_with() + self.mock_tables.update.return_value.execute. \ + assert_called_with(num_retries=0) class TestPatchTable(unittest.TestCase): @@ -1868,9 +1931,6 @@ def setUp(self): self.client = client.BigQueryClient(self.mock_bq_service, self.project) self.body = { 'schema': {'fields': self.schema}, - 'tableReference': { - 'tableId': self.table, 'projectId': self.project, - 'datasetId': self.dataset} } self.expiration_time = 1437513693000 @@ -1896,9 +1956,11 @@ def test_table_patch_failed(self): self.client.swallow_results = True self.mock_tables.patch.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, datasetId=self.dataset, + tableId=self.table, body=self.body) - self.mock_tables.patch.return_value.execute.assert_called_with() + self.mock_tables.patch.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_patch_success(self): """Ensure that if patching the table succeeds, True is returned, @@ -1922,9 +1984,11 @@ def test_table_patch_success(self): self.client.swallow_results = True self.mock_tables.patch.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, datasetId=self.dataset, + tableId=self.table, body=self.body) - self.mock_tables.patch.return_value.execute.assert_called_with() + self.mock_tables.patch.return_value.execute. \ + assert_called_with(num_retries=0) class TestCreateView(unittest.TestCase): @@ -1969,7 +2033,8 @@ def test_view_create_failed(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_view_create_success(self): """Ensure that if creating the table succeeds, True is returned, @@ -1995,7 +2060,8 @@ def test_view_create_success(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) class TestDeleteTable(unittest.TestCase): @@ -2031,7 +2097,8 @@ def test_delete_table_fail(self): self.mock_tables.delete.assert_called_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.delete.return_value.execute.assert_called_with() + self.mock_tables.delete.return_value.execute. \ + assert_called_with(num_retries=0) def test_delete_table_success(self): """Ensure that if deleting table succeeds, True is returned, @@ -2055,7 +2122,8 @@ def test_delete_table_success(self): self.mock_tables.delete.assert_called_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.delete.return_value.execute.assert_called_with() + self.mock_tables.delete.return_value.execute. \ + assert_called_with(num_retries=0) class TestParseTableListReponse(unittest.TestCase): @@ -2191,7 +2259,7 @@ def test_push_failed(self): projectId=self.project, datasetId=self.dataset, tableId=self.table, body=self.data) - execute_calls = [mock.call()] + execute_calls = [mock.call(num_retries=0)] self.mock_table_data.insertAll.return_value.execute.assert_has_calls( execute_calls) @@ -2245,7 +2313,7 @@ def test_push_exception(self): projectId=self.project, datasetId=self.dataset, tableId=self.table, body=self.data) - execute_calls = [mock.call()] + execute_calls = [mock.call(num_retries=0)] self.mock_table_data.insertAll.return_value.execute.assert_has_calls( execute_calls) @@ -2277,7 +2345,7 @@ def test_push_success(self): projectId=self.project, datasetId=self.dataset, tableId=self.table, body=self.data) - execute_calls = [mock.call()] + execute_calls = [mock.call(num_retries=0)] self.mock_table_data.insertAll.return_value.execute.assert_has_calls( execute_calls) @@ -2389,7 +2457,7 @@ def test_get_all_tables(self): bq = client.BigQueryClient(mock_bq_service, 'project') expected_result = [ - '2013_05_appspot', '2013_06_appspot_1', '2013_06_appspot_2', + 'notanappspottable_20130515_0261', '2013_05_appspot', '2013_06_appspot_1', '2013_06_appspot_2', '2013_06_appspot_3', '2013_06_appspot_4', '2013_06_appspot_5', 'appspot_6_2013_06', 'table_not_matching_naming' ] @@ -2595,7 +2663,7 @@ def test_dataset_create_failed(self): projectId=self.project, body=self.body) self.mock_datasets.insert.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_dataset_create_success(self): """Ensure that if creating the table fails, False is returned.""" @@ -2624,7 +2692,7 @@ def test_dataset_create_success(self): projectId=self.project, body=self.body) self.mock_datasets.insert.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) class TestDeleteDataset(unittest.TestCase): @@ -2660,7 +2728,7 @@ def test_delete_datasets_fail(self): self.client.swallow_results = True self.mock_datasets.delete.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_delete_datasets_success(self): """Ensure that if deleting table succeeds, True is returned.""" @@ -2685,7 +2753,7 @@ def test_delete_datasets_success(self): deleteContents=False) self.mock_datasets.delete.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_delete_datasets_delete_contents_success(self): """Ensure that if deleting table succeeds, True is returned.""" @@ -2710,7 +2778,7 @@ def test_delete_datasets_delete_contents_success(self): deleteContents=True) self.mock_datasets.delete.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) FULL_DATASET_LIST_RESPONSE = { @@ -2870,7 +2938,7 @@ def test_dataset_update_failed(self): projectId=self.project, datasetId=self.dataset, body=self.body) self.mock_datasets.update.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_dataset_update_success(self): """Ensure that if creating the table fails, False is returned.""" @@ -2878,18 +2946,18 @@ def test_dataset_update_success(self): self.mock_datasets.update.return_value.execute.side_effect = [{ 'status': 'foo'}, {'status': 'bar'}] - actual = self.client.update_dataset(self.dataset, - self.friendly_name, - self.description, - self.access) + actual = self.client.update_dataset(self.dataset, + friendly_name=self.friendly_name, + description=self.description, + access=self.access) self.assertTrue(actual) self.client.swallow_results = False - actual = self.client.update_dataset(self.dataset, - self.friendly_name, - self.description, - self.access) + actual = self.client.update_dataset(self.dataset, + friendly_name=self.friendly_name, + description=self.description, + access=self.access) self.assertEqual(actual, {'status': 'bar'}) @@ -2899,4 +2967,219 @@ def test_dataset_update_success(self): projectId=self.project, datasetId=self.dataset, body=self.body) self.mock_datasets.update.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) + + +class TestNumRetries(unittest.TestCase): + + def setUp(self): + client._bq_client = None + + self.mock_bq_service = mock.Mock() + self.mock_tables = mock.Mock() + self.mock_job_collection = mock.Mock() + self.mock_datasets = mock.Mock() + self.mock_table_data = mock.Mock() + self.mock_bq_service.tables.return_value = self.mock_tables + self.mock_bq_service.jobs.return_value = self.mock_job_collection + self.mock_bq_service.datasets.return_value = self.mock_datasets + self.mock_bq_service.tabledata.return_value = self.mock_table_data + + self.project_id = 'project' + self.num_retries = 5 + self.client = client.BigQueryClient(self.mock_bq_service, + self.project_id, + num_retries=self.num_retries) + self.dataset = 'dataset' + self.project = 'project' + self.table = 'table' + self.schema = [ + {'name': 'foo', 'type': 'STRING', 'mode': 'nullable'}, + {'name': 'bar', 'type': 'FLOAT', 'mode': 'nullable'} + ] + self.friendly_name = "friendly name" + self.description = "description" + self.access = [{'userByEmail': "bob@gmail.com"}] + self.query = 'SELECT "bar" foo, "foo" bar' + self.rows = [ + {'one': 'uno', 'two': 'dos'}, {'one': 'ein', 'two': 'zwei'}, + {'two': 'kiwi'}] + self.data = { + "kind": "bigquery#tableDataInsertAllRequest", + "rows": [{'insertId': "uno", 'json': {'one': 'uno', 'two': 'dos'}}, + {'insertId': "ein", 'json': + {'one': 'ein', 'two': 'zwei'}}, + {'json': {'two': 'kiwi'}}] + } + + def test_get_response(self): + job_id = 'bar' + + mock_query_job = mock.Mock() + mock_query_reply = mock.Mock() + mock_query_job.execute.return_value = mock_query_reply + self.mock_job_collection.getQueryResults.return_value = mock_query_job + + offset = 5 + limit = 10 + page_token = "token" + timeout = 1 + + self.client.get_query_results(job_id, offset, limit, + page_token, timeout) + + mock_query_job.execute. \ + assert_called_once_with(num_retries=self.num_retries) + + def test_table_exists(self): + expected = [ + {'type': 'FLOAT', 'name': 'foo', 'mode': 'NULLABLE'}, + {'type': 'INTEGER', 'name': 'bar', 'mode': 'NULLABLE'}, + {'type': 'INTEGER', 'name': 'baz', 'mode': 'NULLABLE'}, + ] + + self.mock_tables.get.return_value.execute.return_value = \ + {'schema': {'fields': expected}} + + self.client.get_table_schema(self.dataset, self.table) + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=self.num_retries) + + def test_table_create(self): + self.mock_tables.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.create_table(self.dataset, self.table, + self.schema) + + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_table_update(self): + self.mock_tables.update.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.update_table(self.dataset, self.table, + self.schema) + + self.mock_tables.update.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_table_patch(self): + self.mock_tables.patch.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.patch_table(self.dataset, self.table, + self.schema) + + self.mock_tables.patch.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_view_create(self): + body = { + 'view': {'query': self.query}, + 'tableReference': { + 'tableId': self.table, 'projectId': self.project, + 'datasetId': self.dataset + } + } + + self.mock_tables.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.create_view(self.dataset, self.table, + self.query) + + self.assertTrue(actual) + + self.mock_tables.insert.assert_called_with( + projectId=self.project, datasetId=self.dataset, body=body) + + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_delete_table(self): + self.mock_tables.delete.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.delete_table(self.dataset, self.table) + + self.assertTrue(actual) + + self.mock_tables.delete.assert_called_with( + projectId=self.project, datasetId=self.dataset, tableId=self.table) + + self.mock_tables.delete.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_push(self): + self.mock_table_data.insertAll.return_value.execute.return_value = { + 'status': 'foo'} + + actual = self.client.push_rows(self.dataset, self.table, self.rows, + 'one') + + self.assertTrue(actual) + + self.mock_bq_service.tabledata.assert_called_with() + + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, datasetId=self.dataset, tableId=self.table, + body=self.data) + + execute_calls = [mock.call(num_retries=self.num_retries)] + self.mock_table_data.insertAll.return_value.execute.assert_has_calls( + execute_calls) + + def test_dataset_create(self): + body = { + 'datasetReference': { + 'datasetId': self.dataset, + 'projectId': self.project}, + 'friendlyName': self.friendly_name, + 'description': self.description, + 'access': self.access + } + + self.mock_datasets.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.create_dataset(self.dataset, + self.friendly_name, + self.description, + self.access) + self.assertTrue(actual) + + self.mock_datasets.insert.assert_called_with( + projectId=self.project, body=body) + + self.mock_datasets.insert.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_delete_datasets(self): + self.mock_datasets.delete.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.delete_dataset(self.dataset) + + self.assertTrue(actual) + + self.mock_datasets.delete.assert_called_with( + projectId=self.project, datasetId=self.dataset, + deleteContents=False) + + self.mock_datasets.delete.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_dataset_update(self): + self.mock_datasets.update.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.update_dataset(self.dataset, + self.friendly_name, + self.description, + self.access) + self.assertTrue(actual) + + self.mock_datasets.update.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) diff --git a/bigquery/version.py b/bigquery/version.py index 522ba08..1c19d78 100644 --- a/bigquery/version.py +++ b/bigquery/version.py @@ -1 +1 @@ -__version__ = '1.11.1' +__version__ = '1.15.0' diff --git a/requirements_dev.txt b/requirements_dev.txt index 74162c3..1040dea 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,6 +1,6 @@ nose rednose -mock==1.0.1 +mock==4.0.2 coverage nose-exclude tox diff --git a/tox.ini b/tox.ini index ce76190..58dadc9 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27, py33, py34, nightly, pypy +envlist = py27, py35, py36, nightly, pypy [testenv] commands = nosetests --logging-level=ERROR -a slow --with-coverage --cover-package=bigquery