diff --git a/.travis.yml b/.travis.yml index 9f422c6..ba3cdc8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,7 @@ language: python +before_install: + - sudo apt-get update -q + - sudo apt-get install pypy -y install: - python setup.py develop - pip install tox @@ -7,7 +10,7 @@ notifications: email: false env: - TOXENV=py27 - - TOXENV=py33 - - TOXENV=py34 + - TOXENV=py35 + - TOXENV=py36 - TOXENV=nightly - TOXENV=pypy diff --git a/README.md b/README.md index 8d42cb3..009f125 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ client = get_client(project_id, service_account=service_account, # JSON key provided by Google json_key = 'key.json' -client = get_client(project_id, json_key_file=json_key, readonly=True) +client = get_client(json_key_file=json_key, readonly=True) # Submit an async query. job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000') @@ -126,7 +126,8 @@ query = render_query( conditions=conditions, groupings=grouping, having=having, - order_by=order_by + order_by=order_by, + limit=47 ) job_id, _ = client.query(query) @@ -134,7 +135,7 @@ job_id, _ = client.query(query) # Managing Tables -The BigQuery client provides facilities to manage dataset tables, including creating, deleting, and checking the existence of tables. +The BigQuery client provides facilities to manage dataset tables, including creating, deleting, checking the existence, and getting the metadata of tables. ```python # Create a new table. @@ -149,6 +150,10 @@ deleted = client.delete_table('dataset', 'my_table') # Check if a table exists. exists = client.check_table('dataset', 'my_table') + +# Get a table's full metadata. Includes numRows, numBytes, etc. +# See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables +metadata = client.get_table('dataset', 'my_table') ``` There is also functionality for retrieving tables that are associated with a Google App Engine appspot, assuming table names are in the form of appid_YYYY_MM or YYYY_MM_appid. This allows tables between a date range to be selected and queried on. @@ -168,7 +173,7 @@ The client provides an API for inserting data into a BigQuery table. The last pa ```python # Insert data into table. rows = [ - {'one': 'ein', 'two': 'zwei'} + {'one': 'ein', 'two': 'zwei'}, {'id': 'NzAzYmRiY', 'one': 'uno', 'two': 'dos'}, {'id': 'NzAzYmRiY', 'one': 'ein', 'two': 'zwei'} # duplicate entry ] @@ -290,14 +295,9 @@ exists = client.check_dataset('mydataset') ```python from bigquery import schema_from_record -schema_from_record({"id":123, "posts": [{"id":123, "text": "tihs is a post"}], "username": "bob"}) +schema_from_record({"id":123, "posts": [{"id":123, "text": "this is a post"}], "username": "bob"}) ``` -# Caveats - -BigQuery [flattens](https://developers.google.com/bigquery/docs/data?hl=ja#flatten) results with repeated records, so a result might actually map to multiple rows. This means that the row count may be larger than the actual number of results because BigQuery reports the number of unrolled rows but the returned results are rolled back up. - - # Contributing Requirements to commit here: diff --git a/bigquery/__init__.py b/bigquery/__init__.py index 086be47..b393875 100644 --- a/bigquery/__init__.py +++ b/bigquery/__init__.py @@ -1,6 +1,6 @@ from __future__ import absolute_import -__version__ = '1.6.0' +from .version import __version__ from .client import get_client from .client import ( diff --git a/bigquery/client.py b/bigquery/client.py index 33e8275..bb4d50a 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -1,22 +1,27 @@ import calendar import json -import logging +from logging import getLogger, NullHandler from collections import defaultdict from datetime import datetime, timedelta from hashlib import sha256 +from io import StringIO from time import sleep, time +from functools import reduce -import httplib2 import six -from apiclient.discovery import build, DISCOVERY_URI -from apiclient.errors import HttpError - from bigquery.errors import (BigQueryTimeoutException, JobExecutingException, JobInsertException, UnfinishedQueryException) -from bigquery.schema_builder import schema_from_record +from googleapiclient.discovery import build, DISCOVERY_URI +from googleapiclient.errors import HttpError +from httplib2 import Http + +BIGQUERY_SCOPE = [ + 'https://www.googleapis.com/auth/bigquery' +] -BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery' -BIGQUERY_SCOPE_READ_ONLY = 'https://www.googleapis.com/auth/bigquery.readonly' +BIGQUERY_SCOPE_READ_ONLY = [ + 'https://www.googleapis.com/auth/bigquery.readonly' +] CACHE_TIMEOUT = timedelta(seconds=30) @@ -42,46 +47,57 @@ JOB_FORMAT_NEWLINE_DELIMITED_JSON JOB_DESTINATION_FORMAT_CSV = JOB_FORMAT_CSV +logger = getLogger(__name__) +logger.addHandler(NullHandler()) -def get_client(project_id, credentials=None, + +def get_client(project_id=None, credentials=None, service_url=None, service_account=None, private_key=None, private_key_file=None, json_key=None, json_key_file=None, - readonly=True, swallow_results=True): + readonly=True, swallow_results=True, + num_retries=0): """Return a singleton instance of BigQueryClient. Either AssertionCredentials or a service account and private key combination need to be provided in order to authenticate requests to BigQuery. Parameters ---------- - project_id : str - The BigQuery project id + project_id : str, optional + The BigQuery project id, required unless json_key or json_key_file is + provided. credentials : oauth2client.client.SignedJwtAssertionCredentials, optional - AssertionCredentials instance to authenticate requests to BigQuery (optional, - must provide `service_account` and (`private_key` or `private_key_file`) or - (`json_key` or `json_key_file`) if not included + AssertionCredentials instance to authenticate requests to BigQuery + (optional, must provide `service_account` and (`private_key` or + `private_key_file`) or (`json_key` or `json_key_file`) if not included service_url : str, optional - A URI string template pointing to the location of Google's API discovery - service. Requires two parameters {api} and {apiVersion} that when filled in - produce an absolute URI to the discovery document for that service. If not set - then the default googleapiclient discovery URI is used. See `credentials` + A URI string template pointing to the location of Google's API + discovery service. Requires two parameters {api} and {apiVersion} that + when filled in produce an absolute URI to the discovery document for + that service. If not set then the default googleapiclient discovery URI + is used. See `credentials` service_account : str, optional The Google API service account name. See `credentials` private_key : str, optional - The private key associated with the service account in PKCS12 or PEM format. See `credentials` + The private key associated with the service account in PKCS12 or PEM + format. See `credentials` private_key_file : str, optional - The name of the file containing the private key associated with the service - account in PKCS12 or PEM format. See `credentials` + The name of the file containing the private key associated with the + service account in PKCS12 or PEM format. See `credentials` json_key : dict, optional The JSON key associated with the service account. See `credentials` json_key_file : str, optional - The name of the JSON key file associated with the service account. See `credentials`. + The name of the JSON key file associated with the service account. See + `credentials`. readonly : bool - Bool indicating if BigQuery access is read-only. Has no effect if credentials are - provided. Default True. + Bool indicating if BigQuery access is read-only. Has no effect if + credentials are provided. Default True. swallow_results : bool - If set to False, then return the actual response value instead of converting to - boolean. Default True. + If set to False, then return the actual response value instead of + converting to boolean. Default True. + num_retries : int, optional + The number of times to retry the request. Default 0 (no retry). + Returns ------- @@ -90,71 +106,128 @@ def get_client(project_id, credentials=None, """ if not credentials: - assert (service_account and (private_key or private_key_file)) or (json_key or json_key_file), \ - 'Must provide AssertionCredentials or service account and P12 key or JSON key' + assert (service_account and (private_key or private_key_file)) or ( + json_key or json_key_file), \ + 'Must provide AssertionCredentials or service account and P12 key\ + or JSON key' + + if not project_id: + assert json_key or json_key_file, \ + 'Must provide project_id unless json_key or json_key_file is\ + provided' if service_url is None: service_url = DISCOVERY_URI + scope = BIGQUERY_SCOPE_READ_ONLY if readonly else BIGQUERY_SCOPE + if private_key_file: - with open(private_key_file, 'rb') as key_file: - private_key = key_file.read() + credentials = _credentials().from_p12_keyfile(service_account, + private_key_file, + scopes=scope) + + if private_key: + try: + if isinstance(private_key, basestring): + private_key = private_key.decode('utf-8') + except NameError: + # python3 -- private_key is already unicode + pass + credentials = _credentials().from_p12_keyfile_buffer( + service_account, + StringIO(private_key), + scopes=scope) if json_key_file: with open(json_key_file, 'r') as key_file: json_key = json.load(key_file) if json_key: - service_account = json_key['client_email'] - private_key = json_key['private_key'] + credentials = _credentials().from_json_keyfile_dict(json_key, + scopes=scope) + if not project_id: + project_id = json_key['project_id'] bq_service = _get_bq_service(credentials=credentials, - service_url=service_url, - service_account=service_account, - private_key=private_key, - readonly=readonly) + service_url=service_url) - return BigQueryClient(bq_service, project_id, swallow_results) + return BigQueryClient(bq_service, project_id, swallow_results, + num_retries) -def _get_bq_service(credentials=None, service_url=None, service_account=None, private_key=None, - readonly=True): - """Construct an authorized BigQuery service object.""" +def get_projects(bq_service): + """Given the BigQuery service, return data about all projects.""" + projects_request = bq_service.projects().list().execute() - assert credentials or (service_account and private_key), \ - 'Must provide AssertionCredentials or service account and key' + projects = [] + for project in projects_request.get('projects', []): + project_data = { + 'id': project['id'], + 'name': project['friendlyName'] + } + projects.append(project_data) + return projects - if not credentials: - scope = BIGQUERY_SCOPE_READ_ONLY if readonly else BIGQUERY_SCOPE - credentials = _credentials()(service_account, private_key, scope=scope) - http = httplib2.Http() - http = credentials.authorize(http) - service = build('bigquery', 'v2', http=http, discoveryServiceUrl=service_url) +def _get_bq_service(credentials=None, service_url=None): + """Construct an authorized BigQuery service object.""" + + assert credentials, 'Must provide ServiceAccountCredentials' + + http = credentials.authorize(Http()) + service = build( + 'bigquery', + 'v2', + http=http, + discoveryServiceUrl=service_url, + cache_discovery=False + ) return service def _credentials(): """Import and return SignedJwtAssertionCredentials class""" - from oauth2client.client import SignedJwtAssertionCredentials + from oauth2client.service_account import ServiceAccountCredentials - return SignedJwtAssertionCredentials + return ServiceAccountCredentials class BigQueryClient(object): - def __init__(self, bq_service, project_id, swallow_results=True): + def __init__(self, bq_service, project_id, swallow_results=True, + num_retries=0): self.bigquery = bq_service self.project_id = project_id self.swallow_results = swallow_results + self.num_retries = num_retries self.cache = {} + def _get_project_id(self, project_id=None): + """ Get new project_id + + Default is self.project_id, which is the project client authenticate to. + A new project_id is specified when client wants to authenticate to 1 project, + but run jobs in a different project. + + Parameters + ---------- + project_id : str + BigQuery project_id + + Returns + ------- + project_id: BigQuery project_id + """ + if project_id is None: + project_id = self.project_id + return project_id + def _submit_query_job(self, query_data): """ Submit a query job to BigQuery. This is similar to BigQueryClient.query, but gives the user - direct access to the query method on the offical BigQuery + direct access to the query method on the official BigQuery python client. For fine-grained control over a query job, see: @@ -170,7 +243,7 @@ def _submit_query_job(self, query_data): ------- tuple job id and query results if query completed. If dry_run is True, - job id will be None and results will be empty if the query is valid + job id will be None and results will be [cacheHit and totalBytesProcessed] if the query is valid or a dict containing the response if invalid. Raises @@ -179,13 +252,14 @@ def _submit_query_job(self, query_data): On timeout """ - logging.debug('Submitting query job: %s' % query_data) + logger.debug('Submitting query job: %s' % query_data) job_collection = self.bigquery.jobs() try: query_reply = job_collection.query( - projectId=self.project_id, body=query_data).execute() + projectId=self.project_id, body=query_data).execute( + num_retries=self.num_retries) except HttpError as e: if query_data.get("dryRun", False): return None, json.loads(e.content.decode('utf8')) @@ -195,19 +269,44 @@ def _submit_query_job(self, query_data): schema = query_reply.get('schema', {'fields': None})['fields'] rows = query_reply.get('rows', []) job_complete = query_reply.get('jobComplete', False) + cache_hit = query_reply['cacheHit'] + total_bytes_processed = query_reply['totalBytesProcessed'] # raise exceptions if it's not an async query # and job is not completed after timeout if not job_complete and query_data.get("timeoutMs", False): - logging.error('BigQuery job %s timeout' % job_id) + logger.error('BigQuery job %s timeout' % job_id) raise BigQueryTimeoutException() - + + if query_data.get("dryRun", True): + return job_id, [cache_hit, total_bytes_processed] return job_id, [self._transform_row(row, schema) for row in rows] + def _get_job_reference(self, job_id): + """ Get job reference from job_id + For more details, see: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#resource + + Parameters + ---------- + job_id: + Id of the job + + Returns + ------- + job_reference: json of job_reference + """ + job_reference = { + "projectId": self.project_id, + "jobId": job_id + } + + return job_reference + def _insert_job(self, body_object): """ Submit a job to BigQuery - Direct proxy to the insert() method of the offical BigQuery + Direct proxy to the insert() method of the official BigQuery python client. Able to submit load, link, query, copy, or extract jobs. @@ -228,16 +327,16 @@ def _insert_job(self, body_object): BigQueryTimeoutException on timeout """ - logging.debug('Submitting job: %s' % body_object) + logger.debug('Submitting job: %s' % body_object) job_collection = self.bigquery.jobs() return job_collection.insert( projectId=self.project_id, body=body_object - ).execute() + ).execute(num_retries=self.num_retries) - def query(self, query, max_results=None, timeout=0, dry_run=False): + def query(self, query, max_results=None, timeout=0, dry_run=False, use_legacy_sql=None, external_udf_uris=None): """Submit a query to BigQuery. Parameters @@ -250,16 +349,22 @@ def query(self, query, max_results=None, timeout=0, dry_run=False): How long to wait for the query to complete, in seconds before the request times out and returns. dry_run : bool, optional - If True, the query isn't actually run. A valid query will return an - empty response, while an invalid one will return the same error message - it would if it wasn't a dry run. + If True, the query isn't actually run. A valid query will return + cache hit, and total bytes processed, while an invalid one will return the same error + message it would if it wasn't a dry run. + use_legacy_sql : bool, optional. Default True. + If False, the query will use BigQuery's standard SQL (https://cloud.google.com/bigquery/sql-reference/) + external_udf_uris : list, optional + Contains external UDF URIs. If given, URIs must be Google Cloud + Storage and have .js extensions. + Returns ------- tuple - (job id, query results) if the query completed. If dry_run is True, job id - will be None and results will be empty if the query is valid or a ``dict`` containing - the response if invalid. + (job id, query results) if the query completed. If dry_run is True, + job id will be None and results will be [cacheHit and totalBytesProcessed] if the query is valid + or a ``dict`` containing the response if invalid. Raises ------ @@ -267,14 +372,22 @@ def query(self, query, max_results=None, timeout=0, dry_run=False): on timeout """ - logging.debug('Executing query: %s' % query) + logger.debug('Executing query: %s' % query) query_data = { 'query': query, 'timeoutMs': timeout * 1000, 'dryRun': dry_run, - 'maxResults': max_results, + 'maxResults': max_results } + + if use_legacy_sql is not None: + query_data['useLegacySql'] = use_legacy_sql + + if external_udf_uris: + query_data['userDefinedFunctionResources'] = \ + [ {'resourceUri': u} for u in external_udf_uris ] + return self._submit_query_job(query_data) def get_query_schema(self, job_id): @@ -294,12 +407,12 @@ def get_query_schema(self, job_id): query_reply = self.get_query_results(job_id, offset=0, limit=0) if not query_reply['jobComplete']: - logging.warning('BigQuery job %s not complete' % job_id) + logger.warning('BigQuery job %s not complete' % job_id) raise UnfinishedQueryException() return query_reply['schema']['fields'] - def get_table_schema(self, dataset, table): + def get_table_schema(self, dataset, table, project_id=None): """Return the table schema. Parameters @@ -308,6 +421,8 @@ def get_table_schema(self, dataset, table): The dataset containing the `table`. table : str The table to get the schema for + project_id: str, optional + The project of the dataset. Returns ------- @@ -315,15 +430,16 @@ def get_table_schema(self, dataset, table): A ``list`` of ``dict`` objects that represent the table schema. If the table doesn't exist, None is returned. """ + project_id = self._get_project_id(project_id) - try: + try: result = self.bigquery.tables().get( - projectId=self.project_id, + projectId=project_id, tableId=table, - datasetId=dataset).execute() + datasetId=dataset).execute(num_retries=self.num_retries) except HttpError as e: if int(e.resp['status']) == 404: - logging.warn('Table %s.%s does not exist', dataset, table) + logger.warn('Table %s.%s does not exist', dataset, table) return None raise @@ -341,8 +457,8 @@ def check_job(self, job_id): ------- tuple (``bool``, ``int``) Whether or not the query has completed and the - total number of rows included in the query table if it has completed - (else 0) + total number of rows included in the query table if it has + completed (else 0) """ query_reply = self.get_query_results(job_id, offset=0, limit=0) @@ -352,8 +468,8 @@ def check_job(self, job_id): def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): """Retrieve a list of rows from a query table by job id. - This method will append results from multiple pages together. If you want - to manually page through results, you can use `get_query_results` + This method will append results from multiple pages together. If you + want to manually page through results, you can use `get_query_results` method directly. Parameters @@ -374,9 +490,10 @@ def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): """ # Get query results - query_reply = self.get_query_results(job_id, offset=offset, limit=limit, timeout=timeout) + query_reply = self.get_query_results(job_id, offset=offset, + limit=limit, timeout=timeout) if not query_reply['jobComplete']: - logging.warning('BigQuery job %s not complete' % job_id) + logger.warning('BigQuery job %s not complete' % job_id) raise UnfinishedQueryException() schema = query_reply["schema"]["fields"] @@ -386,51 +503,59 @@ def get_query_rows(self, job_id, offset=None, limit=None, timeout=0): # Append to records if there are multiple pages for query results while page_token and (not limit or len(records) < limit): - query_reply = self.get_query_results(job_id, offset=offset, limit=limit, - page_token=page_token, timeout=timeout) + query_reply = self.get_query_results( + job_id, offset=offset, limit=limit, page_token=page_token, + timeout=timeout) page_token = query_reply.get("pageToken") rows = query_reply.get('rows', []) records += [self._transform_row(row, schema) for row in rows] return records[:limit] if limit else records - def check_dataset(self, dataset_id): + def check_dataset(self, dataset_id, project_id=None): """Check to see if a dataset exists. Parameters ---------- dataset_id : str Dataset unique id + project_id: str, optional + The project the dataset is in Returns ------- bool True if dataset at `dataset_id` exists, else Fasle - """ - dataset = self.get_dataset(dataset_id) + """ + dataset = self.get_dataset(dataset_id, project_id) return bool(dataset) - def get_dataset(self, dataset_id): + def get_dataset(self, dataset_id, project_id=None): """Retrieve a dataset if it exists, otherwise return an empty dict. Parameters ---------- dataset_id : str Dataset unique id + project_id: str, optional + The project the dataset is in Returns ------- dict Contains dataset object if it exists, else empty """ - try: + project_id = self._get_project_id(project_id) + + try: dataset = self.bigquery.datasets().get( - projectId=self.project_id, datasetId=dataset_id).execute() + projectId=project_id, datasetId=dataset_id).execute( + num_retries=self.num_retries) except HttpError: dataset = {} return dataset - def check_table(self, dataset, table): + def check_table(self, dataset, table, project_id=None): """Check to see if a table exists. Parameters @@ -439,16 +564,18 @@ def check_table(self, dataset, table): The dataset to check table : str The name of the table + project_id: str, optional + The project the table is in Returns ------- bool True if table exists, else False """ - table = self.get_table(dataset, table) + table = self.get_table(dataset, table, project_id) return bool(table) - def get_table(self, dataset, table): + def get_table(self, dataset, table, project_id=None): """ Retrieve a table if it exists, otherwise return an empty dict. Parameters @@ -457,22 +584,27 @@ def get_table(self, dataset, table): The dataset that the table is in table : str The name of the table + project_id: str, optional + The project that the table is in Returns ------- dict Containing the table object if it exists, else empty """ - try: + project_id = self._get_project_id(project_id) + try: table = self.bigquery.tables().get( - projectId=self.project_id, datasetId=dataset, - tableId=table).execute() + projectId=project_id, datasetId=dataset, + tableId=table).execute(num_retries=self.num_retries) except HttpError: table = {} return table - def create_table(self, dataset, table, schema, expiration_time=None): + def create_table(self, dataset, table, schema, + expiration_time=None, time_partitioning=False, + project_id=None): """Create a new table in the dataset. Parameters @@ -482,9 +614,13 @@ def create_table(self, dataset, table, schema, expiration_time=None): table : str The name of the table to create schema : dict - The table schema - expiration_time : float, optional + The table schema + expiration_time : int or double, optional The expiry time in milliseconds since the epoch. + time_partitioning : bool, optional + Create a time partitioning. + project_id: str, optional + The project to create the table in Returns ------- @@ -492,12 +628,13 @@ def create_table(self, dataset, table, schema, expiration_time=None): If the table was successfully created, or response from BigQuery if swallow_results is set to False """ + project_id = self._get_project_id(project_id) body = { 'schema': {'fields': schema}, 'tableReference': { 'tableId': table, - 'projectId': self.project_id, + 'projectId': project_id, 'datasetId': dataset } } @@ -505,27 +642,29 @@ def create_table(self, dataset, table, schema, expiration_time=None): if expiration_time is not None: body['expirationTime'] = expiration_time + if time_partitioning: + body['timePartitioning'] = {'type': 'DAY'} + try: table = self.bigquery.tables().insert( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return table except HttpError as e: - logging.error(('Cannot create table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, - e.content)) + logger.error(('Cannot create table {0}.{1}.{2}\n' + 'Http Error: {3}').format(project_id, dataset, table, e.content)) if self.swallow_results: return False else: return {} - def update_table(self, dataset, table, schema): + def update_table(self, dataset, table, schema, project_id=None): """Update an existing table in the dataset. Parameters @@ -536,6 +675,8 @@ def update_table(self, dataset, table, schema): The name of the table to update schema : dict Table schema + project_id: str, optional + The project to update the table in Returns ------- @@ -543,37 +684,38 @@ def update_table(self, dataset, table, schema): bool indicating if the table was successfully updated or not, or response from BigQuery if swallow_results is set to False. """ + project_id = self._get_project_id(project_id) body = { 'schema': {'fields': schema}, 'tableReference': { 'tableId': table, - 'projectId': self.project_id, + 'projectId': project_id, 'datasetId': dataset } } try: result = self.bigquery.tables().update( - projectId=self.project_id, + projectId=project_id, + tableId= table, datasetId=dataset, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return result except HttpError as e: - logging.error(('Cannot update table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, - e.content)) + logger.error(('Cannot update table {0}.{1}.{2}\n' + 'Http Error: {3}').format(project_id, dataset, table, e.content)) if self.swallow_results: return False else: return {} - def patch_table(self, dataset, table, schema): + def patch_table(self, dataset, table, schema, project_id=None): """Patch an existing table in the dataset. Parameters @@ -584,6 +726,8 @@ def patch_table(self, dataset, table, schema): The name of the table to patch schema : dict The table schema + project_id: str, optional + The project to patch the table in Returns ------- @@ -591,37 +735,33 @@ def patch_table(self, dataset, table, schema): Bool indicating if the table was successfully patched or not, or response from BigQuery if swallow_results is set to False """ + project_id = self._get_project_id(project_id) body = { 'schema': {'fields': schema}, - 'tableReference': { - 'tableId': table, - 'projectId': self.project_id, - 'datasetId': dataset - } } try: result = self.bigquery.tables().patch( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, + tableId=table, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return result except HttpError as e: - logging.error(('Cannot patch table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, - e.content)) + logger.error(('Cannot patch table {0}.{1}.{2}\n' + 'Http Error: {3}').format(project_id, dataset, table, e.content)) if self.swallow_results: return False else: return {} - def create_view(self, dataset, view, query): + def create_view(self, dataset, view, query, use_legacy_sql=None, project_id=None): """Create a new view in the dataset. Parameters @@ -631,7 +771,12 @@ def create_view(self, dataset, view, query): view : str The name of the view to create query : dict - A query that BigQuery executes when the view is referenced. + A query that BigQuery executes when the view is referenced. + use_legacy_sql : bool, optional + If False, the query will use BigQuery's standard SQL + (https://cloud.google.com/bigquery/sql-reference/) + project_id: str, optional + The project to create the view in Returns ------- @@ -639,11 +784,12 @@ def create_view(self, dataset, view, query): bool indicating if the view was successfully created or not, or response from BigQuery if swallow_results is set to False. """ + project_id = self._get_project_id(project_id) body = { 'tableReference': { 'tableId': view, - 'projectId': self.project_id, + 'projectId': project_id, 'datasetId': dataset }, 'view': { @@ -651,27 +797,29 @@ def create_view(self, dataset, view, query): } } + if use_legacy_sql is not None: + body['view']['useLegacySql'] = use_legacy_sql + try: view = self.bigquery.tables().insert( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, body=body - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return view except HttpError as e: - logging.error(('Cannot create view {0}.{1}\n' - 'Http Error: {2}').format(dataset, view, - e.content)) + logger.error(('Cannot create view {0}.{1}\n' + 'Http Error: {2}').format(dataset, view, e.content)) if self.swallow_results: return False else: return {} - def delete_table(self, dataset, table): + def delete_table(self, dataset, table, project_id=None): """Delete a table from the dataset. Parameters @@ -680,6 +828,8 @@ def delete_table(self, dataset, table): The dataset to delete the table from. table : str The name of the table to delete + project_id: str, optional + String id of the project Returns ------- @@ -687,28 +837,28 @@ def delete_table(self, dataset, table): bool indicating if the table was successfully deleted or not, or response from BigQuery if swallow_results is set for False. """ + project_id = self._get_project_id(project_id) - try: + try: response = self.bigquery.tables().delete( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, tableId=table - ).execute() + ).execute(num_retries=self.num_retries) if self.swallow_results: return True else: return response except HttpError as e: - logging.error(('Cannot delete table {0}.{1}\n' - 'Http Error: {2}').format(dataset, table, - e.content)) + logger.error(('Cannot delete table {0}.{1}\n' + 'Http Error: {2}').format(dataset, table, e.content)) if self.swallow_results: return False else: return {} - def get_tables(self, dataset_id, app_id, start_time, end_time): + def get_tables(self, dataset_id, app_id, start_time, end_time, project_id=None): """Retrieve a list of tables that are related to the given app id and are inside the range of start and end times. @@ -722,6 +872,8 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): The datetime or unix time after which records will be fetched. end_time : Union[datetime, int] The datetime or unix time up to which records will be fetched. + project_id: str, optional + String id of the project Returns ------- @@ -735,7 +887,7 @@ def get_tables(self, dataset_id, app_id, start_time, end_time): if isinstance(end_time, datetime): end_time = calendar.timegm(end_time.utctimetuple()) - every_table = self._get_all_tables(dataset_id) + every_table = self._get_all_tables(dataset_id, project_id) app_tables = every_table.get(app_id, {}) return self._filter_tables_by_time(app_tables, start_time, end_time) @@ -745,7 +897,7 @@ def import_data_from_uris( source_uris, dataset, table, - schema=None, + schema=None, job=None, source_format=None, create_disposition=None, @@ -758,25 +910,28 @@ def import_data_from_uris( field_delimiter=None, quote=None, skip_leading_rows=None, + project_id=None, ): """ - Imports data into a BigQuery table from cloud storage. Optional arguments that are not - specified are determined by BigQuery as described: - https://developers.google.com/bigquery/docs/reference/v2/jobs + Imports data into a BigQuery table from cloud storage. Optional + arguments that are not specified are determined by BigQuery as + described: + https://developers.google.com/bigquery/docs/reference/v2/jobs Parameters ---------- source_urls : list - A ``list`` of ``str`` objects representing the urls on cloud storage - of the form: gs://bucket/filename + A ``list`` of ``str`` objects representing the urls on cloud + storage of the form: gs://bucket/filename dataset : str String id of the dataset table : str String id of the table - job : str, optional - Identifies the job (a unique job id is automatically generated if not provided) schema : list, optional - Represents the BigQuery schema + Represents the BigQuery schema + job : str, optional + Identifies the job (a unique job id is automatically generated if + not provided) source_format : str, optional One of the JOB_SOURCE_FORMAT_* constants create_disposition : str, optional @@ -799,6 +954,8 @@ def import_data_from_uris( Quote character for csv only skip_leading_rows : int, optional For csv only + project_id: str, optional + String id of the project Returns ------- @@ -813,9 +970,11 @@ def import_data_from_uris( source_uris = source_uris if isinstance(source_uris, list) \ else [source_uris] + project_id = self._get_project_id(project_id) + configuration = { "destinationTable": { - "projectId": self.project_id, + "projectId": project_id, "tableId": table, "datasetId": dataset }, @@ -887,13 +1046,10 @@ def import_data_from_uris( "configuration": { 'load': configuration }, - "jobReference": { - "projectId": self.project_id, - "jobId": job - } + "jobReference": self._get_job_reference(job) } - logging.debug("Creating load job %s" % body) + logger.debug("Creating load job %s" % body) job_resource = self._insert_job(body) self._raise_insert_exception_if_error(job_resource) return job_resource @@ -902,30 +1058,31 @@ def export_data_to_uris( self, destination_uris, dataset, - table, + table, job=None, compression=None, destination_format=None, print_header=None, field_delimiter=None, + project_id=None, ): """ - Export data from a BigQuery table to cloud storage. Optional arguments that are - not specified are determined by BigQuery as described: + Export data from a BigQuery table to cloud storage. Optional arguments + that are not specified are determined by BigQuery as described: https://developers.google.com/bigquery/docs/reference/v2/jobs Parameters ---------- - destination_urls : Union[str, list] + destination_uris : Union[str, list] ``str`` or ``list`` of ``str`` objects representing the URIs on cloud storage of the form: gs://bucket/filename dataset : str String id of the dataset table : str - String id of the table + String id of the table job : str, optional - String identifying the job (a unique jobid is automatically generated if - not provided) + String identifying the job (a unique jobid is automatically + generated if not provided) compression : str, optional One of the JOB_COMPRESSION_* constants destination_format : str, optional @@ -934,6 +1091,8 @@ def export_data_to_uris( Whether or not to print the header field_delimiter : str, optional Character separating fields in delimited file + project_id: str, optional + String id of the project Returns ------- @@ -948,9 +1107,11 @@ def export_data_to_uris( destination_uris = destination_uris \ if isinstance(destination_uris, list) else [destination_uris] + project_id = self._get_project_id(project_id) + configuration = { "sourceTable": { - "projectId": self.project_id, + "projectId": project_id, "tableId": table, "datasetId": dataset }, @@ -981,13 +1142,10 @@ def export_data_to_uris( "configuration": { 'extract': configuration }, - "jobReference": { - "projectId": self.project_id, - "jobId": job - } + "jobReference": self._get_job_reference(job) } - logging.info("Creating export job %s" % body) + logger.info("Creating export job %s" % body) job_resource = self._insert_job(body) self._raise_insert_exception_if_error(job_resource) return job_resource @@ -996,13 +1154,17 @@ def write_to_table( self, query, dataset=None, - table=None, - external_udf_uris=[], + table=None, + external_udf_uris=None, allow_large_results=None, use_query_cache=None, priority=None, create_disposition=None, write_disposition=None, + use_legacy_sql=None, + maximum_billing_tier=None, + flatten=None, + project_id=None, ): """ Write query result to table. If dataset or table is not provided, @@ -1017,9 +1179,9 @@ def write_to_table( dataset : str, optional String id of the dataset table : str, optional - String id of the table + String id of the table external_udf_uris : list, optional - Contains extternal UDF URIs. If given, URIs must be Google Cloud + Contains external UDF URIs. If given, URIs must be Google Cloud Storage and have .js extensions. allow_large_results : bool, optional Whether or not to allow large results @@ -1031,6 +1193,20 @@ def write_to_table( One of the JOB_CREATE_* constants write_disposition : str, optional One of the JOB_WRITE_* constants + use_legacy_sql: bool, optional + If False, the query will use BigQuery's standard SQL + (https://cloud.google.com/bigquery/sql-reference/) + maximum_billing_tier : integer, optional + Limits the billing tier for this job. Queries that have resource + usage beyond this tier will fail (without incurring a charge). If + unspecified, this will be set to your project default. For more + information, + see https://cloud.google.com/bigquery/pricing#high-compute + flatten : bool, optional + Whether or not to flatten nested and repeated fields + in query results + project_id: str, optional + String id of the project Returns ------- @@ -1047,9 +1223,11 @@ def write_to_table( "query": query, } + project_id = self._get_project_id(project_id) + if dataset and table: configuration['destinationTable'] = { - "projectId": self.project_id, + "projectId": project_id, "tableId": table, "datasetId": dataset } @@ -1057,9 +1235,18 @@ def write_to_table( if allow_large_results is not None: configuration['allowLargeResults'] = allow_large_results + if flatten is not None: + configuration['flattenResults'] = flatten + + if maximum_billing_tier is not None: + configuration['maximumBillingTier'] = maximum_billing_tier + if use_query_cache is not None: configuration['useQueryCache'] = use_query_cache + if use_legacy_sql is not None: + configuration['useLegacySql'] = use_legacy_sql + if priority: configuration['priority'] = priority @@ -1069,13 +1256,9 @@ def write_to_table( if write_disposition: configuration['writeDisposition'] = write_disposition - configuration['userDefinedFunctionResources'] = [] - for external_udf_uri in external_udf_uris: - configuration['userDefinedFunctionResources'].append( - { - "resourceUri": external_udf_uri - } - ) + if external_udf_uris: + configuration['userDefinedFunctionResources'] = \ + [ {'resourceUri': u} for u in external_udf_uris ] body = { "configuration": { @@ -1083,7 +1266,7 @@ def write_to_table( } } - logging.info("Creating write to table job %s" % body) + logger.info("Creating write to table job %s" % body) job_resource = self._insert_job(body) self._raise_insert_exception_if_error(job_resource) return job_resource @@ -1095,8 +1278,8 @@ def wait_for_job(self, job, interval=5, timeout=60): Parameters ---------- job : Union[dict, str] - ``dict`` representing a BigQuery job resource, or a ``str`` representing - the BigQuery job id + ``dict`` representing a BigQuery job resource, or a ``str`` + representing the BigQuery job id interval : float, optional Polling interval in seconds, default = 5 timeout : float, optional @@ -1125,19 +1308,21 @@ def wait_for_job(self, job, interval=5, timeout=60): sleep(interval) request = self.bigquery.jobs().get(projectId=self.project_id, jobId=job_id) - job_resource = request.execute() + job_resource = request.execute(num_retries=self.num_retries) self._raise_executing_exception_if_error(job_resource) complete = job_resource.get('status').get('state') == u'DONE' elapsed_time = time() - start_time # raise exceptions if timeout if not complete: - logging.error('BigQuery job %s timeout' % job_id) + logger.error('BigQuery job %s timeout' % job_id) raise BigQueryTimeoutException() return job_resource - def push_rows(self, dataset, table, rows, insert_id_key=None): + def push_rows(self, dataset, table, rows, insert_id_key=None, + skip_invalid_rows=None, ignore_unknown_values=None, + template_suffix=None, project_id=None): """Upload rows to BigQuery table. Parameters @@ -1145,11 +1330,21 @@ def push_rows(self, dataset, table, rows, insert_id_key=None): dataset : str The dataset to upload to table : str - The name of the table to insert rows into + The name of the table to insert rows into rows : list A ``list`` of rows (``dict`` objects) to add to the table insert_id_key : str, optional - Key for insertId in row + Key for insertId in row. + You can use dot separated key for nested column. + skip_invalid_rows : bool, optional + Insert all valid rows of a request, even if invalid rows exist. + ignore_unknown_values : bool, optional + Accept rows that contain values that do not match the schema. + template_suffix : str, optional + Inserts the rows into an {table}{template_suffix}. + If table {table}{template_suffix} doesn't exist, create from {table}. + project_id: str, optional + The project to upload to Returns ------- @@ -1157,15 +1352,18 @@ def push_rows(self, dataset, table, rows, insert_id_key=None): bool indicating if insert succeeded or not, or response from BigQuery if swallow_results is set for False. """ - + project_id = self._get_project_id(project_id) table_data = self.bigquery.tabledata() rows_data = [] for row in rows: each_row = {} each_row["json"] = row - if insert_id_key in row: - each_row["insertId"] = row[insert_id_key] + if insert_id_key is not None: + keys = insert_id_key.split('.') + val = reduce(lambda d, key: d.get(key) if d else None, keys, row) + if val is not None: + each_row["insertId"] = val rows_data.append(each_row) data = { @@ -1173,16 +1371,25 @@ def push_rows(self, dataset, table, rows, insert_id_key=None): "rows": rows_data } - try: + if skip_invalid_rows is not None: + data['skipInvalidRows'] = skip_invalid_rows + + if ignore_unknown_values is not None: + data['ignoreUnknownValues'] = ignore_unknown_values + + if template_suffix is not None: + data['templateSuffix'] = template_suffix + + try: response = table_data.insertAll( - projectId=self.project_id, + projectId=project_id, datasetId=dataset, tableId=table, body=data - ).execute() + ).execute(num_retries=self.num_retries) if response.get('insertErrors'): - logging.error('BigQuery insert errors: %s' % response) + logger.error('BigQuery insert errors: %s' % response) if self.swallow_results: return False else: @@ -1194,7 +1401,7 @@ def push_rows(self, dataset, table, rows, insert_id_key=None): return response except HttpError as e: - logging.exception('Problem with BigQuery insertAll') + logger.exception('Problem with BigQuery insertAll') if self.swallow_results: return False else: @@ -1207,16 +1414,43 @@ def push_rows(self, dataset, table, rows, insert_id_key=None): }] } - def _get_all_tables(self, dataset_id, cache=False): - """Retrieve a list of all tables for the dataset. + def get_all_tables(self, dataset_id, project_id=None): + """Retrieve a list of tables for the dataset. Parameters ---------- dataset_id : str - The dataset to retrieve table names for + The dataset to retrieve table data for. + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset + + Returns + ------- + A ``list`` with all table names + """ + tables_data = self._get_all_tables_for_dataset(dataset_id, project_id) + + tables = [] + for table in tables_data.get('tables', []): + table_name = table.get('tableReference', {}).get('tableId') + if table_name: + tables.append(table_name) + return tables + + def _get_all_tables(self, dataset_id, cache=False, project_id=None): + """Retrieve the list of tables for dataset, that respect the formats: + * appid_YYYY_MM + * YYYY_MM_appid + + Parameters + ---------- + dataset_id : str + The dataset to retrieve table names for cache : bool, optional To use cached value or not (default False). Timeout value equals CACHE_TIMEOUT. + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1230,23 +1464,43 @@ def _get_all_tables(self, dataset_id, cache=False): do_fetch = False if do_fetch: - result = self.bigquery.tables().list( - projectId=self.project_id, - datasetId=dataset_id).execute() - - page_token = result.get('nextPageToken') - while page_token: - res = self.bigquery.tables().list( - projectId=self.project_id, - datasetId=dataset_id, - pageToken=page_token - ).execute() - page_token = res.get('nextPageToken') - result['tables'] += res.get('tables', []) + result = self._get_all_tables_for_dataset(dataset_id, project_id) self.cache[dataset_id] = (datetime.now(), result) return self._parse_table_list_response(result) + def _get_all_tables_for_dataset(self, dataset_id, project_id=None): + """Retrieve a list of all tables for the dataset. + + Parameters + ---------- + dataset_id : str + The dataset to retrieve table names for + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset + + Returns + ------- + dict + A ``dict`` containing tables key with all tables + """ + project_id = self._get_project_id(project_id) + + result = self.bigquery.tables().list( + projectId=project_id, + datasetId=dataset_id).execute(num_retries=self.num_retries) + + page_token = result.get('nextPageToken') + while page_token: + res = self.bigquery.tables().list( + projectId=project_id, + datasetId=dataset_id, + pageToken=page_token + ).execute(num_retries=self.num_retries) + page_token = res.get('nextPageToken') + result['tables'] += res.get('tables', []) + return result + def _parse_table_list_response(self, list_response): """Parse the response received from calling list on tables. @@ -1289,6 +1543,8 @@ def _parse_table_name(self, table_id): """Parse a table name in the form of appid_YYYY_MM or YYYY_MM_appid and return a tuple consisting of YYYY-MM and the app id. + Returns (None, None) in the event of a name like _YYYYMMDD_ + Parameters ---------- table_id : str @@ -1297,7 +1553,8 @@ def _parse_table_name(self, table_id): Returns ------- tuple - (year/month, app id), or (None, None) if the table id cannot be parsed. + (year/month, app id), or (None, None) if the table id cannot be + parsed. """ # Prefix date @@ -1315,9 +1572,10 @@ def _parse_table_name(self, table_id): year_month = "-".join(attributes[-2:]) app_id = "-".join(attributes[:-2]) + # Check if date parsed correctly if year_month.count("-") == 1 and all( - [num.isdigit() for num in year_month.split('-')]): + [num.isdigit() for num in year_month.split('-')]) and len(year_month) == 7: return year_month, app_id return None, None @@ -1368,9 +1626,11 @@ def _in_range(self, start_time, end_time, time): time <= start_time <= time + ONE_MONTH or \ time <= end_time <= time + ONE_MONTH - def get_query_results(self, job_id, offset=None, limit=None, page_token=None, timeout=0): - """Execute the query job indicated by the given job id. This is direct mapping to - bigquery api https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults + def get_query_results(self, job_id, offset=None, limit=None, + page_token=None, timeout=0): + """Execute the query job indicated by the given job id. This is direct + mapping to bigquery api + https://cloud.google.com/bigquery/docs/reference/v2/jobs/getQueryResults Parameters ---------- @@ -1381,7 +1641,8 @@ def get_query_results(self, job_id, offset=None, limit=None, page_token=None, ti limit : int, optional The maximum number of results to retrieve. page_token : optional - Page token, returned by previous call, to request the next page of results. + Page token, returned by previous call, to request the next page of + results. timeout : float, optional Timeout in seconds @@ -1398,7 +1659,7 @@ def get_query_results(self, job_id, offset=None, limit=None, page_token=None, ti startIndex=offset, maxResults=limit, pageToken=page_token, - timeoutMs=timeout * 1000).execute() + timeoutMs=timeout * 1000).execute(num_retries=self.num_retries) def _transform_row(self, row, schema): """Apply the given schema to the given BigQuery data row. @@ -1519,14 +1780,14 @@ def _raise_executing_exception_if_error(self, job): # DataSet manipulation methods # def create_dataset(self, dataset_id, friendly_name=None, description=None, - access=None): + access=None, location=None, project_id=None): """Create a new BigQuery dataset. Parameters ---------- dataset_id : str - Unique ``str`` identifying the dataset with the project (the referenceID - of the dataset, not the integer id of the dataset) + Unique ``str`` identifying the dataset with the project (the + referenceID of the dataset, not the integer id of the dataset) friendly_name: str, optional A human readable name description: str, optional @@ -1534,6 +1795,11 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, access : list, optional Indicating access permissions (see https://developers.google.com/bigquery/docs/reference/v2/datasets#resource) + location : str, optional + Indicating where dataset should be stored: EU or US (see + https://developers.google.com/bigquery/docs/reference/v2/datasets#resource) + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1541,54 +1807,70 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None, ``bool`` indicating if dataset was created or not, or response from BigQuery if swallow_results is set for False """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - dataset_data = self.dataset_resource(dataset_id, + dataset_data = self.dataset_resource(dataset_id, + project_id=project_id, friendly_name=friendly_name, description=description, - access=access) + access=access, + location=location + ) - response = datasets.insert(projectId=self.project_id, - body=dataset_data).execute() + response = datasets.insert(projectId=project_id, + body=dataset_data).execute( + num_retries=self.num_retries) if self.swallow_results: return True else: return response except HttpError as e: - logging.error('Cannot create dataset {0}, {1}'.format(dataset_id, - e)) + logger.error( + 'Cannot create dataset {0}, {1}'.format(dataset_id, e)) if self.swallow_results: return False else: return {} - def get_datasets(self): + def get_datasets(self, project_id=None): """List all datasets in the project. + + Parameters + ---------- + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- list Dataset resources """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - request = datasets.list(projectId=self.project_id) - result = request.execute() + request = datasets.list(projectId=project_id) + result = request.execute(num_retries=self.num_retries) return result.get('datasets', []) except HttpError as e: - logging.error("Cannot list datasets: {0}".format(e)) + logger.error("Cannot list datasets: {0}".format(e)) return None - def delete_dataset(self, dataset_id, delete_contents=False): + def delete_dataset(self, dataset_id, delete_contents=False, project_id=None): """Delete a BigQuery dataset. Parameters ---------- dataset_id : str - Unique ``str`` identifying the datset with the project (the referenceId of the dataset) + Unique ``str`` identifying the dataset with the project (the + referenceId of the dataset) + Unique ``str`` identifying the BigQuery project contains the dataset delete_contents : bool, optional - If True, forces the deletion of the dataset even when the dataset contains data - (Default = False) + If True, forces the deletion of the dataset even when the dataset + contains data (Default = False) + project_id: str, optional Returns ------- @@ -1601,26 +1883,28 @@ def delete_dataset(self, dataset_id, delete_contents=False): HttpError 404 when dataset with dataset_id does not exist """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - request = datasets.delete(projectId=self.project_id, + request = datasets.delete(projectId=project_id, datasetId=dataset_id, deleteContents=delete_contents) - response = request.execute() + response = request.execute(num_retries=self.num_retries) if self.swallow_results: return True else: return response except HttpError as e: - logging.error('Cannot delete dataset {0}: {1}'.format(dataset_id, - e)) + logger.error( + 'Cannot delete dataset {0}: {1}'.format(dataset_id, e)) if self.swallow_results: return False else: return {} def update_dataset(self, dataset_id, friendly_name=None, description=None, - access=None): + access=None, project_id=None): """Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource. @@ -1628,42 +1912,51 @@ def update_dataset(self, dataset_id, friendly_name=None, description=None, Parameters ---------- dataset_id : str - Unique ``str`` identifying the dataset with the project (the referencedId of the dataset) + Unique ``str`` identifying the dataset with the project (the + referencedId of the dataset) friendly_name : str, optional An optional descriptive name for the dataset. description : str, optional An optional description of the dataset. access : list, optional Indicating access permissions + project_id: str, optional + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- Union[bool, dict] - ``bool`` indicating if the update was successful or not, or response - from BigQuery if swallow_results is set for False. + ``bool`` indicating if the update was successful or not, or + response from BigQuery if swallow_results is set for False. """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - body = self.dataset_resource(dataset_id, friendly_name, - description, access) - request = datasets.update(projectId=self.project_id, + body = self.dataset_resource(dataset_id, + friendly_name=friendly_name, + description=description, + access=access, + project_id=project_id) + + request = datasets.update(projectId=project_id, datasetId=dataset_id, body=body) - response = request.execute() + response = request.execute(num_retries=self.num_retries) if self.swallow_results: return True else: return response except HttpError as e: - logging.error('Cannot update dataset {0}: {1}'.format(dataset_id, - e)) + logger.error( + 'Cannot update dataset {0}: {1}'.format(dataset_id, e)) if self.swallow_results: return False else: return {} def patch_dataset(self, dataset_id, friendly_name=None, description=None, - access=None): + access=None, project_id=None): """Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource. @@ -1671,13 +1964,16 @@ def patch_dataset(self, dataset_id, friendly_name=None, description=None, Parameters ---------- dataset_id : str - Unique string idenfitying the dataset with the project (the referenceId of the dataset) + Unique string idenfitying the dataset with the project (the + referenceId of the dataset) friendly_name : str, optional An optional descriptive name for the dataset. description : str, optional An optional description of the dataset. access : list, optional Indicating access permissions. + project_id: str, optional + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- @@ -1685,49 +1981,60 @@ def patch_dataset(self, dataset_id, friendly_name=None, description=None, ``bool`` indicating if the patch was successful or not, or response from BigQuery if swallow_results is set for False. """ - try: + project_id = self._get_project_id(project_id) + + try: datasets = self.bigquery.datasets() - body = self.dataset_resource(dataset_id, friendly_name, - description, access) - request = datasets.patch(projectId=self.project_id, + body = self.dataset_resource(dataset_id, + friendly_name=friendly_name, + description=description, + access=access, + project_id=project_id) + request = datasets.patch(projectId=project_id, datasetId=dataset_id, body=body) - response = request.execute() + response = request.execute(num_retries=self.num_retries) if self.swallow_results: return True else: return response except HttpError as e: - logging.error('Cannot patch dataset {0}: {1}'.format(dataset_id, - e)) + logger.error('Cannot patch dataset {0}: {1}'.format(dataset_id, e)) if self.swallow_results: return False else: return {} def dataset_resource(self, ref_id, friendly_name=None, description=None, - access=None): - """See https://developers.google.com/bigquery/docs/reference/v2/datasets#resource + access=None, location=None, project_id=None): + """See + https://developers.google.com/bigquery/docs/reference/v2/datasets#resource Parameters ---------- ref_id : str - Dataset id (the reference id, not the integer id) + Dataset id (the reference id, not the integer id) friendly_name : str, optional An optional descriptive name for the dataset description : str, optional An optional description for the dataset access : list, optional Indicating access permissions + location: str, optional, 'EU' or 'US' + An optional geographical location for the dataset(EU or US) + project_id: str + Unique ``str`` identifying the BigQuery project contains the dataset Returns ------- dict Representing BigQuery dataset resource """ + project_id = self._get_project_id(project_id) + data = { "datasetReference": { "datasetId": ref_id, - "projectId": self.project_id + "projectId": project_id } } if friendly_name: @@ -1736,6 +2043,8 @@ def dataset_resource(self, ref_id, friendly_name=None, description=None, data["description"] = description if access: data["access"] = access + if location: + data["location"] = location return data @@ -1749,9 +2058,10 @@ def schema_from_record(cls, record): record : dict representing a record to be inserted into big query, where all keys are ``str`` objects (representing column names in - the record) and all values are of type ``int``, ``str``, ``unicode``, - ``float``, ``bool``, ``datetime``, or ``dict``. A ``dict`` value represents a - record, and must conform to the same restrictions as record + the record) and all values are of type ``int``, ``str``, + ``unicode``, ``float``, ``bool``, ``datetime``, or ``dict``. A + ``dict`` value represents a record, and must conform to the same + restrictions as record. Returns ------- @@ -1760,9 +2070,10 @@ def schema_from_record(cls, record): Notes ----- - Results are undefined if a different value type is provided for a repeated - field: E.g. + Results are undefined if a different value type is provided for a + repeated field: E.g. >>> { rfield: [ { x: 1}, {x: "a string"} ] } # undefined! """ + from bigquery.schema_builder import schema_from_record return schema_from_record(record) diff --git a/bigquery/query_builder.py b/bigquery/query_builder.py index cb5e60a..435bb73 100644 --- a/bigquery/query_builder.py +++ b/bigquery/query_builder.py @@ -1,8 +1,11 @@ -import logging +from logging import getLogger, NullHandler + +logger = getLogger(__name__) +logger.addHandler(NullHandler()) def render_query(dataset, tables, select=None, conditions=None, - groupings=None, having=None, order_by=None): + groupings=None, having=None, order_by=None, limit=None): """Render a query that will run over the given tables using the specified parameters. @@ -13,21 +16,26 @@ def render_query(dataset, tables, select=None, conditions=None, tables : Union[dict, list] The table in `dataset` to query. select : dict, optional - The keys function as column names and the values function as options to apply to - the select field such as alias and format. For example, select['start_time'] might - have the form {'alias': 'StartTime', 'format': 'INTEGER-FORMAT_UTC_USEC'}, which would - be represented as 'SEC_TO_TIMESTAMP(INTEGER(start_time)) as StartTime' in a query. Pass - `None` to seoect all. + The keys function as column names and the values function as options to + apply to the select field such as alias and format. For example, + select['start_time'] might have the form + {'alias': 'StartTime', 'format': 'INTEGER-FORMAT_UTC_USEC'}, which + would be represented as 'SEC_TO_TIMESTAMP(INTEGER(start_time)) as + StartTime' in a query. Pass `None` to select all. conditions : list, optional - a ``list`` of ``dict`` objects to filter results by. Each dict should have the keys 'field', - 'type', and 'comparators'. The first two map to strings representing the field (e.g. 'foo') - and type (e.g. 'FLOAT'). 'comparators' maps to another ``dict`` containing the keys 'condition', - 'negate', and 'value'. If 'comparators' = {'condition': '>=', 'negate': False, 'value': 1}, this - example will be rdnered as 'foo >= FLOAT('1')' in the query. + a ``list`` of ``dict`` objects to filter results by. Each dict should + have the keys 'field', 'type', and 'comparators'. The first two map to + strings representing the field (e.g. 'foo') and type (e.g. 'FLOAT'). + 'comparators' maps to another ``dict`` containing the keys 'condition', + 'negate', and 'value'. + If 'comparators' = {'condition': '>=', 'negate': False, 'value': 1}, + this example will be rendered as 'foo >= FLOAT('1')' in the query. ``list`` of field names to group by order_by : dict, optional - Keys = {'field', 'direction'}. `dict` should be formatted as {'field':'TimeStamp, 'direction':'desc'} - or similar + Keys = {'field', 'direction'}. `dict` should be formatted as + {'field':'TimeStamp, 'direction':'desc'} or similar + limit : int, optional + Limit the amount of data needed to be returned. Returns ------- @@ -38,13 +46,14 @@ def render_query(dataset, tables, select=None, conditions=None, if None in (dataset, tables): return None - query = "%s %s %s %s %s %s" % ( + query = "%s %s %s %s %s %s %s" % ( _render_select(select), _render_sources(dataset, tables), _render_conditions(conditions), _render_groupings(groupings), _render_having(having), - _render_order(order_by) + _render_order(order_by), + _render_limit(limit) ) return query @@ -147,8 +156,8 @@ def _render_sources(dataset, tables): tables['from_date'], tables['to_date']) except KeyError as exp: - logging.warn('Missing parameter %s in selecting sources' % - (exp)) + logger.warn( + 'Missing parameter %s in selecting sources' % (exp)) else: return "FROM " + ", ".join( @@ -161,7 +170,7 @@ def _render_conditions(conditions): Parameters ---------- conditions : list - A list of dictionay items to filter a table. + A list of dictionary items to filter a table. Returns ------- @@ -184,7 +193,7 @@ def _render_conditions(conditions): comparators = condition.get('comparators') if None in (field, field_type, comparators) or not comparators: - logging.warn('Invalid condition passed in: %s' % condition) + logger.warn('Invalid condition passed in: %s' % condition) continue rendered_conditions.append( @@ -232,6 +241,8 @@ def _render_condition(field, field_type, comparators): else: value = _render_condition_value(value, field_type) value = "(" + value + ")" + elif condition == "IS NULL" or condition == "IS NOT NULL": + return field + " " + condition elif condition == "BETWEEN": if isinstance(value, (tuple, list, set)) and len(value) == 2: value = ' AND '.join( @@ -239,7 +250,7 @@ def _render_condition(field, field_type, comparators): for v in value]) ) elif isinstance(value, (tuple, list, set)) and len(value) != 2: - logging.warn('Invalid condition passed in: %s' % condition) + logger.warn('Invalid condition passed in: %s' % condition) else: value = _render_condition_value(value, field_type) @@ -335,7 +346,7 @@ def _render_having(having_conditions): comparators = condition.get('comparators') if None in (field, field_type, comparators) or not comparators: - logging.warn('Invalid condition passed in: %s' % condition) + logger.warn('Invalid condition passed in: %s' % condition) continue rendered_conditions.append( @@ -367,3 +378,22 @@ def _render_order(order): return '' return "ORDER BY %s %s" % (", ".join(order['fields']), order['direction']) + + +def _render_limit(limit): + """Render the limit part of a query. + + Parameters + ---------- + limit : int, optional + Limit the amount of data needed to be returned. + + Returns + ------- + str + A string that represents the "limit" part of a query. + """ + if not limit: + return '' + + return "LIMIT %s" % limit diff --git a/bigquery/schema_builder.py b/bigquery/schema_builder.py index 575b390..65027b8 100644 --- a/bigquery/schema_builder.py +++ b/bigquery/schema_builder.py @@ -126,7 +126,7 @@ def bigquery_type(o, timestamp_parser=default_timestamp_parser): """ t = type(o) - if t == int: + if t in six.integer_types: return "integer" elif (t == six.binary_type and six.PY2) or t == six.text_type: if timestamp_parser and timestamp_parser(o): diff --git a/bigquery/tests/test_client.py b/bigquery/tests/test_client.py index f7050c6..1f2d247 100644 --- a/bigquery/tests/test_client.py +++ b/bigquery/tests/test_client.py @@ -2,18 +2,16 @@ import mock import six -from nose.tools import raises - -from apiclient.errors import HttpError from bigquery import client from bigquery.errors import ( JobInsertException, JobExecutingException, BigQueryTimeoutException ) +from googleapiclient.errors import HttpError +from nose.tools import raises class HttpResponse(object): - def __init__(self, status, reason='There was an error'): """ Args: @@ -24,7 +22,6 @@ def __init__(self, status, reason='There was an error'): class TestGetClient(unittest.TestCase): - def setUp(self): client._bq_client = None @@ -51,7 +48,7 @@ def test_initialize_readonly(self, mock_build, mock_return_cred): mock_cred = mock.Mock() mock_http = mock.Mock() mock_service_url = mock.Mock() - mock_cred.return_value.authorize.return_value = mock_http + mock_cred.from_p12_keyfile_buffer.return_value.authorize.return_value = mock_http mock_bq = mock.Mock() mock_build.return_value = mock_bq key = 'key' @@ -65,11 +62,18 @@ def test_initialize_readonly(self, mock_build, mock_return_cred): readonly=True) mock_return_cred.assert_called_once_with() - mock_cred.assert_called_once_with(service_account, key, - scope=BIGQUERY_SCOPE_READ_ONLY) - self.assertTrue(mock_cred.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_cred.from_p12_keyfile_buffer.assert_called_once_with( + service_account, mock.ANY, + scopes=BIGQUERY_SCOPE_READ_ONLY) + self.assertTrue( + mock_cred.from_p12_keyfile_buffer.return_value.authorize.called) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @@ -84,7 +88,7 @@ def test_initialize_read_write(self, mock_build, mock_return_cred): mock_cred = mock.Mock() mock_http = mock.Mock() mock_service_url = mock.Mock() - mock_cred.return_value.authorize.return_value = mock_http + mock_cred.from_p12_keyfile_buffer.return_value.authorize.return_value = mock_http mock_bq = mock.Mock() mock_build.return_value = mock_bq key = 'key' @@ -98,19 +102,23 @@ def test_initialize_read_write(self, mock_build, mock_return_cred): readonly=False) mock_return_cred.assert_called_once_with() - mock_cred.assert_called_once_with(service_account, key, - scope=BIGQUERY_SCOPE) - self.assertTrue(mock_cred.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_cred.from_p12_keyfile_buffer.assert_called_once_with( + service_account, mock.ANY, scopes=BIGQUERY_SCOPE) + self.assertTrue( + mock_cred.from_p12_keyfile_buffer.return_value.authorize.called) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @mock.patch('bigquery.client._credentials') @mock.patch('bigquery.client.build') - @mock.patch('__builtin__.open' if six.PY2 else 'builtins.open') - def test_initialize_key_file(self, mock_open, mock_build, - mock_return_cred): + def test_initialize_key_file(self, mock_build, mock_return_cred): """Ensure that a BigQueryClient is initialized and returned with read/write permissions using a private key file. """ @@ -119,12 +127,10 @@ def test_initialize_key_file(self, mock_open, mock_build, mock_cred = mock.Mock() mock_http = mock.Mock() mock_service_url = mock.Mock() - mock_cred.return_value.authorize.return_value = mock_http + mock_cred.from_p12_keyfile.return_value.authorize.return_value = mock_http mock_bq = mock.Mock() mock_build.return_value = mock_bq key_file = 'key.pem' - key = 'key' - mock_open.return_value.__enter__.return_value.read.return_value = key service_account = 'account' project_id = 'project' mock_return_cred.return_value = mock_cred @@ -134,13 +140,19 @@ def test_initialize_key_file(self, mock_open, mock_build, service_account=service_account, private_key_file=key_file, readonly=False) - mock_open.assert_called_once_with(key_file, 'rb') mock_return_cred.assert_called_once_with() - mock_cred.assert_called_once_with(service_account, key, - scope=BIGQUERY_SCOPE) - self.assertTrue(mock_cred.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, - discoveryServiceUrl=mock_service_url) + mock_cred.from_p12_keyfile.assert_called_once_with(service_account, + key_file, + scopes=BIGQUERY_SCOPE) + self.assertTrue( + mock_cred.from_p12_keyfile.return_value.authorize.called) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) @@ -157,7 +169,7 @@ def test_initialize_json_key_file(self, mock_open, mock_build, mock_return_cred) mock_cred = mock.Mock() mock_http = mock.Mock() mock_service_url = mock.Mock() - mock_cred.return_value.authorize.return_value = mock_http + mock_cred.from_json_keyfile_dict.return_value.authorize.return_value = mock_http mock_bq = mock.Mock() mock_build.return_value = mock_bq json_key_file = 'key.json' @@ -167,16 +179,98 @@ def test_initialize_json_key_file(self, mock_open, mock_build, mock_return_cred) mock_return_cred.return_value = mock_cred bq_client = client.get_client( - project_id, service_url=mock_service_url, json_key_file=json_key_file, readonly=False) + project_id, service_url=mock_service_url, + json_key_file=json_key_file, readonly=False) - mock_open.assert_called_once_with(json_key_file, 'r') mock_return_cred.assert_called_once_with() - mock_cred.assert_called_once_with(json_key['client_email'], json_key['private_key'], scope=BIGQUERY_SCOPE) - self.assertTrue(mock_cred.return_value.authorize.called) - mock_build.assert_called_once_with('bigquery', 'v2', http=mock_http, discoveryServiceUrl=mock_service_url) + mock_cred.from_json_keyfile_dict.assert_called_once_with(json_key, + scopes=BIGQUERY_SCOPE) + self.assertTrue( + mock_cred.from_json_keyfile_dict.return_value.authorize.called) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) self.assertEquals(mock_bq, bq_client.bigquery) self.assertEquals(project_id, bq_client.project_id) + @mock.patch('bigquery.client._credentials') + @mock.patch('bigquery.client.build') + @mock.patch('__builtin__.open' if six.PY2 else 'builtins.open') + def test_initialize_json_key_file_without_project_id(self, mock_open, mock_build, + mock_return_cred): + """Ensure that a BigQueryClient is initialized and returned with + read/write permissions using a JSON key file without project_id. + """ + from bigquery.client import BIGQUERY_SCOPE + import json + + mock_cred = mock.Mock() + mock_http = mock.Mock() + mock_service_url = mock.Mock() + mock_cred.from_json_keyfile_dict.return_value.authorize.return_value = mock_http + mock_bq = mock.Mock() + mock_build.return_value = mock_bq + json_key_file = 'key.json' + json_key = {'client_email': 'mail', 'private_key': 'pkey', 'project_id': 'project'} + mock_open.return_value.__enter__.return_value.read.return_value = json.dumps(json_key) + mock_return_cred.return_value = mock_cred + + bq_client = client.get_client( + service_url=mock_service_url, json_key_file=json_key_file, readonly=False) + + mock_open.assert_called_once_with(json_key_file, 'r') + mock_return_cred.assert_called_once_with() + mock_cred.from_json_keyfile_dict.assert_called_once_with(json_key, + scopes=BIGQUERY_SCOPE) + self.assertTrue( + mock_cred.from_json_keyfile_dict.return_value.authorize.called) + mock_build.assert_called_once_with( + 'bigquery', + 'v2', + http=mock_http, + discoveryServiceUrl=mock_service_url, + cache_discovery=False + ) + self.assertEquals(mock_bq, bq_client.bigquery) + self.assertEquals(json_key['project_id'], bq_client.project_id) + + +class TestGetProjectIds(unittest.TestCase): + + def test_get_project_ids(self): + mock_bq_service = mock.Mock() + mock_bq_service.projects().list().execute.return_value = { + 'kind': 'bigquery#projectList', + 'projects': [ + { + 'friendlyName': 'Big Query Test', + 'id': 'big-query-test', + 'kind': 'bigquery#project', + 'numericId': '1435372465', + 'projectReference': {'projectId': 'big-query-test'} + }, + { + 'friendlyName': 'BQ Company project', + 'id': 'bq-project', + 'kind': 'bigquery#project', + 'numericId': '4263574685796', + 'projectReference': {'projectId': 'bq-project'} + } + ], + 'totalItems': 2 + } + + projects = client.get_projects(mock_bq_service) + expected_projects_data = [ + {'id': 'big-query-test', 'name': 'Big Query Test'}, + {'id': 'bq-project', 'name': 'BQ Company project'} + ] + self.assertEqual(projects, expected_projects_data) + class TestQuery(unittest.TestCase): @@ -190,6 +284,7 @@ def setUp(self): self.query = 'foo' self.project_id = 'project' + self.external_udf_uris = ['gs://bucket/external_udf.js'] self.client = client.BigQueryClient(self.mock_bq_service, self.project_id) @@ -202,17 +297,24 @@ def test_query(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, - 'jobComplete': True + 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job - job_id, results = self.client.query(self.query) + job_id, results = self.client.query(self.query, external_udf_uris=self.external_udf_uris) self.mock_job_collection.query.assert_called_once_with( projectId=self.project_id, - body={'query': self.query, 'timeoutMs': 0, 'dryRun': False, - 'maxResults': None} + body={ + 'query': self.query, + 'userDefinedFunctionResources': [ {'resourceUri': u} for u in self.external_udf_uris ], + 'timeoutMs': 0, + 'dryRun': False, + 'maxResults': None + } ) self.assertEquals(job_id, 'spiderman') self.assertEquals(results, []) @@ -229,6 +331,8 @@ def test_query_max_results_set(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -257,6 +361,8 @@ def test_query_timeout_set(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -282,6 +388,8 @@ def test_sync_query_timeout(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': False, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -300,6 +408,8 @@ def test_async_query_timeout(self): mock_query_job.execute.return_value = { 'jobReference': expected_job_ref, 'jobComplete': False, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -309,14 +419,18 @@ def test_async_query_timeout(self): self.assertEquals(results, []) def test_query_dry_run_valid(self): - """Ensure that None and an empty list is returned from the query when + """Ensure that None and [cacheHit, totalBytesProcessed] is returned from the query when dry_run is True and the query is valid. """ mock_query_job = mock.Mock() - mock_query_job.execute.return_value = {'jobReference': {}, - 'jobComplete': True} + mock_query_job.execute.return_value = { + 'jobReference': {}, + 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 + } self.mock_job_collection.query.return_value = mock_query_job @@ -328,7 +442,7 @@ def test_query_dry_run_valid(self): 'dryRun': True} ) self.assertIsNone(job_id) - self.assertEqual([], results) + self.assertEqual([False, 0], results) def test_query_dry_run_invalid(self): """Ensure that None and a dict is returned from the query when dry_run @@ -368,6 +482,8 @@ def test_query_with_results(self): 'schema': {'fields': [{'name': 'foo', 'type': 'INTEGER'}]}, 'rows': [{'f': [{'v': 10}]}], 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 } self.mock_job_collection.query.return_value = mock_query_job @@ -382,6 +498,32 @@ def test_query_with_results(self): self.assertEquals(job_id, 'spiderman') self.assertEquals(results, [{'foo': 10}]) + def test_query_with_using_legacy_sql(self): + """Ensure that use_legacy_sql bool gets used""" + + mock_query_job = mock.Mock() + expected_job_id = 'spiderman' + expected_job_ref = {'jobId': expected_job_id} + + mock_query_job.execute.return_value = { + 'jobReference': expected_job_ref, + 'jobComplete': True, + 'cacheHit': False, + 'totalBytesProcessed': 0 + } + + self.mock_job_collection.query.return_value = mock_query_job + + job_id, results = self.client.query(self.query, use_legacy_sql=False) + + self.mock_job_collection.query.assert_called_once_with( + projectId=self.project_id, + body={'query': self.query, 'timeoutMs': 0, 'dryRun': False, + 'maxResults': None, 'useLegacySql': False} + ) + self.assertEquals(job_id, 'spiderman') + self.assertEquals(results, []) + class TestGetQueryResults(unittest.TestCase): @@ -420,7 +562,7 @@ def test_get_response(self): projectId=self.project_id, jobId=job_id, startIndex=offset, maxResults=limit, pageToken=page_token, timeoutMs=1000) - mock_query_job.execute.assert_called_once_with() + mock_query_job.execute.assert_called_once_with(num_retries=0) self.assertEquals(actual, mock_query_reply) @@ -749,7 +891,7 @@ def test_json_job_body_constructed_correctly(self): body = { "jobReference": { "projectId": self.project_id, - "jobId": "job" + "jobId": "job", }, "configuration": { "load": { @@ -1023,9 +1165,11 @@ def setUp(self): self.project_id = 'project' self.dataset_id = 'dataset' self.table_id = 'table' + self.maximum_billing_tier = 1000 self.external_udf_uris = ['gs://bucket/external_udf.js'] self.use_query_cache = False self.priority = "INTERACTIVE" + self.flatten_results = False self.client = client.BigQueryClient(self.mock_api, self.project_id) @@ -1049,6 +1193,7 @@ def test_write(self): }], "useQueryCache": self.use_query_cache, "priority": self.priority, + "flattenResults": self.flatten_results, } } } @@ -1059,6 +1204,7 @@ def test_write(self): self.table_id, external_udf_uris=self.external_udf_uris, use_query_cache=False, + flatten=False, priority=self.priority) self.mock_api.jobs().insert.assert_called_with( @@ -1068,6 +1214,44 @@ def test_write(self): self.assertEqual(result, expected_result) + def test_write_maxbilltier(self): + """ Ensure that write is working when maximumBillingTier is set""" + expected_result = { + 'status': {'state': u'RUNNING'}, + } + + body = { + "configuration": { + "query": { + "destinationTable": { + "projectId": self.project_id, + "datasetId": self.dataset_id, + "tableId": self.table_id + }, + "query": self.query, + "userDefinedFunctionResources": [{ + "resourceUri": self.external_udf_uris[0] + }], + "useQueryCache": self.use_query_cache, + "priority": self.priority, + "maximumBillingTier": self.maximum_billing_tier + } + } + } + + self.mock_api.jobs().insert().execute.return_value = expected_result + result = self.client.write_to_table( + self.query, self.dataset_id, self.table_id, priority=self.priority, + external_udf_uris=self.external_udf_uris, use_query_cache=False, + maximum_billing_tier=self.maximum_billing_tier) + + self.mock_api.jobs().insert.assert_called_with( + projectId=self.project_id, + body=body + ) + + self.assertEqual(result, expected_result) + def test_write_http_error(self): """ Test write with http error""" expected_result = { @@ -1191,6 +1375,15 @@ def test_not_inside_range(self): "kind": "bigquery#tableList", "etag": "\"GSclnjk0zID1ucM3F-xYinOm1oE/cn58Rpu8v8pB4eoJQaiTe11lPQc\"", "tables": [ + { + "kind": "bigquery#table", + "id": "project:dataset.notanappspottable_20130515_0261", + "tableReference": { + "projectId": "project", + "datasetId": "dataset", + "tableId": "notanappspottable_20130515_0261" + } + }, { "kind": "bigquery#table", "id": "project:dataset.2013_05_appspot_1", @@ -1254,12 +1447,21 @@ def test_not_inside_range(self): "tableId": "appspot_6_2013_06" } }, + { + "kind": "bigquery#table", + "id": "project:dataset.table_not_matching_naming", + "tableReference": { + "projectId": "project", + "datasetId": "dataset", + "tableId": "table_not_matching_naming" + } + }, { "kind": "bigquery#table", "id": "bad table data" - } + }, ], - "totalItems": 8 + "totalItems": 9 } @@ -1325,7 +1527,8 @@ def test_table_exists(self): expected, self.client.get_table_schema(self.dataset, self.table)) self.mock_tables.get.assert_called_once_with( projectId=self.project, tableId=self.table, datasetId=self.dataset) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) def test_table_does_not_exist(self): """Ensure that None is returned if the table doesn't exist.""" @@ -1336,7 +1539,8 @@ def test_table_does_not_exist(self): self.client.get_table_schema(self.dataset, self.table)) self.mock_tables.get.assert_called_once_with( projectId=self.project, tableId=self.table, datasetId=self.dataset) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) @mock.patch('bigquery.client.BigQueryClient.get_query_results') @@ -1491,7 +1695,8 @@ def test_table_does_not_exist(self): self.mock_tables.get.assert_called_once_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) def test_table_does_exist(self): """Ensure that if the table does exist, True is returned.""" @@ -1506,7 +1711,8 @@ def test_table_does_exist(self): self.mock_tables.get.assert_called_once_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.get.return_value.execute.assert_called_once_with() + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=0) class TestCreateTable(unittest.TestCase): @@ -1530,6 +1736,7 @@ def setUp(self): 'datasetId': self.dataset} } self.expiration_time = 1437513693000 + self.time_partitioning = True def test_table_create_failed(self): """Ensure that if creating the table fails, False is returned, @@ -1555,7 +1762,8 @@ def test_table_create_failed(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_create_success(self): """Ensure that if creating the table succeeds, True is returned, @@ -1581,7 +1789,8 @@ def test_table_create_success(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_create_body_with_expiration_time(self): """Ensure that if expiration_time has specified, @@ -1601,7 +1810,30 @@ def test_table_create_body_with_expiration_time(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) + + def test_table_create_body_with_time_partitioning(self): + """Ensure that if time_partitioning has specified, + it passed to the body.""" + + self.mock_tables.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.create_table(self.dataset, self.table, + self.schema, + time_partitioning=self.time_partitioning) + + body = self.body.copy() + body.update({ + 'timePartitioning': {'type': 'DAY'} + }) + + self.mock_tables.insert.assert_called_with( + projectId=self.project, datasetId=self.dataset, body=body) + + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) class TestUpdateTable(unittest.TestCase): @@ -1648,9 +1880,11 @@ def test_table_update_failed(self): self.client.swallow_results = True self.mock_tables.update.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, tableId=self.table, datasetId=self.dataset, + body=self.body) - self.mock_tables.update.return_value.execute.assert_called_with() + self.mock_tables.update.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_update_success(self): """Ensure that if updating the table succeeds, True is returned, @@ -1674,9 +1908,11 @@ def test_table_update_success(self): self.client.swallow_results = True self.mock_tables.update.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, tableId=self.table, datasetId=self.dataset, + body=self.body) - self.mock_tables.update.return_value.execute.assert_called_with() + self.mock_tables.update.return_value.execute. \ + assert_called_with(num_retries=0) class TestPatchTable(unittest.TestCase): @@ -1695,9 +1931,6 @@ def setUp(self): self.client = client.BigQueryClient(self.mock_bq_service, self.project) self.body = { 'schema': {'fields': self.schema}, - 'tableReference': { - 'tableId': self.table, 'projectId': self.project, - 'datasetId': self.dataset} } self.expiration_time = 1437513693000 @@ -1723,9 +1956,11 @@ def test_table_patch_failed(self): self.client.swallow_results = True self.mock_tables.patch.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, datasetId=self.dataset, + tableId=self.table, body=self.body) - self.mock_tables.patch.return_value.execute.assert_called_with() + self.mock_tables.patch.return_value.execute. \ + assert_called_with(num_retries=0) def test_table_patch_success(self): """Ensure that if patching the table succeeds, True is returned, @@ -1749,9 +1984,11 @@ def test_table_patch_success(self): self.client.swallow_results = True self.mock_tables.patch.assert_called_with( - projectId=self.project, datasetId=self.dataset, body=self.body) + projectId=self.project, datasetId=self.dataset, + tableId=self.table, body=self.body) - self.mock_tables.patch.return_value.execute.assert_called_with() + self.mock_tables.patch.return_value.execute. \ + assert_called_with(num_retries=0) class TestCreateView(unittest.TestCase): @@ -1796,7 +2033,8 @@ def test_view_create_failed(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) def test_view_create_success(self): """Ensure that if creating the table succeeds, True is returned, @@ -1822,7 +2060,8 @@ def test_view_create_success(self): self.mock_tables.insert.assert_called_with( projectId=self.project, datasetId=self.dataset, body=self.body) - self.mock_tables.insert.return_value.execute.assert_called_with() + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=0) class TestDeleteTable(unittest.TestCase): @@ -1858,7 +2097,8 @@ def test_delete_table_fail(self): self.mock_tables.delete.assert_called_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.delete.return_value.execute.assert_called_with() + self.mock_tables.delete.return_value.execute. \ + assert_called_with(num_retries=0) def test_delete_table_success(self): """Ensure that if deleting table succeeds, True is returned, @@ -1882,7 +2122,8 @@ def test_delete_table_success(self): self.mock_tables.delete.assert_called_with( projectId=self.project, datasetId=self.dataset, tableId=self.table) - self.mock_tables.delete.return_value.execute.assert_called_with() + self.mock_tables.delete.return_value.execute. \ + assert_called_with(num_retries=0) class TestParseTableListReponse(unittest.TestCase): @@ -2018,7 +2259,7 @@ def test_push_failed(self): projectId=self.project, datasetId=self.dataset, tableId=self.table, body=self.data) - execute_calls = [mock.call()] + execute_calls = [mock.call(num_retries=0)] self.mock_table_data.insertAll.return_value.execute.assert_has_calls( execute_calls) @@ -2072,7 +2313,7 @@ def test_push_exception(self): projectId=self.project, datasetId=self.dataset, tableId=self.table, body=self.data) - execute_calls = [mock.call()] + execute_calls = [mock.call(num_retries=0)] self.mock_table_data.insertAll.return_value.execute.assert_has_calls( execute_calls) @@ -2104,14 +2345,104 @@ def test_push_success(self): projectId=self.project, datasetId=self.dataset, tableId=self.table, body=self.data) - execute_calls = [mock.call()] + execute_calls = [mock.call(num_retries=0)] self.mock_table_data.insertAll.return_value.execute.assert_has_calls( execute_calls) + def test_request_data_with_options(self): + """Ensure that insertAll body has optional property only when + the optional parameter of push_rows passed. + """ + expected_body = self.data.copy() + + self.client.push_rows( + self.dataset, self.table, self.rows, + insert_id_key='one') + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, + datasetId=self.dataset, + tableId=self.table, + body=expected_body) + + self.client.push_rows( + self.dataset, self.table, self.rows, + insert_id_key='one', + ignore_unknown_values=False, + skip_invalid_rows=False) + expected_body['ignoreUnknownValues'] = False + expected_body['skipInvalidRows'] = False + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, + datasetId=self.dataset, + tableId=self.table, + body=expected_body) + + self.client.push_rows( + self.dataset, self.table, self.rows, + insert_id_key='one', + ignore_unknown_values=True, + skip_invalid_rows=True, + template_suffix='20160428' + ) + expected_body['ignoreUnknownValues'] = True + expected_body['skipInvalidRows'] = True + expected_body['templateSuffix'] = '20160428' + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, + datasetId=self.dataset, + tableId=self.table, + body=expected_body) + + def test_insert_id_key_with_nested_column(self): + """Ensure that dot separated insert_id_key properly extracted with nested column value.""" + rows = [ + {'nested': {'col': 'nested_col1'}, 'val': 1}, + {'nested': {'col': 'nested_col2'}, 'val': 2}, + ] + expected_body = self.data.copy() + expected_body['rows'] = [ + {'insertId': 'nested_col1', 'json': {'nested': {'col': 'nested_col1'}, 'val': 1}}, + {'insertId': 'nested_col2', 'json': {'nested': {'col': 'nested_col2'}, 'val': 2}}, + ] + + self.client.push_rows(self.dataset, self.table, rows, + insert_id_key='nested.col') + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, + datasetId=self.dataset, + tableId=self.table, + body=expected_body) + + expected_body = self.data.copy() + expected_body['rows'] = [ + {'insertId': 1, 'json': {'nested': {'col': 'nested_col1'}, 'val': 1}}, + {'insertId': 2, 'json': {'nested': {'col': 'nested_col2'}, 'val': 2}}, + ] + self.client.push_rows(self.dataset, self.table, rows, + insert_id_key='val') + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, + datasetId=self.dataset, + tableId=self.table, + body=expected_body) + + expected_body = self.data.copy() + expected_body['rows'] = [ + {'json': {'nested': {'col': 'nested_col1'}, 'val': 1}}, + {'json': {'nested': {'col': 'nested_col2'}, 'val': 2}}, + ] + self.client.push_rows(self.dataset, self.table, rows, + insert_id_key='no_such.column') + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, + datasetId=self.dataset, + tableId=self.table, + body=expected_body) + class TestGetAllTables(unittest.TestCase): - def test_get_tables(self): + def test_get_all_tables(self): """Ensure get_all_tables fetches table names from BigQuery.""" mock_execute = mock.Mock() @@ -2125,6 +2456,29 @@ def test_get_tables(self): bq = client.BigQueryClient(mock_bq_service, 'project') + expected_result = [ + 'notanappspottable_20130515_0261', '2013_05_appspot', '2013_06_appspot_1', '2013_06_appspot_2', + '2013_06_appspot_3', '2013_06_appspot_4', '2013_06_appspot_5', + 'appspot_6_2013_06', 'table_not_matching_naming' + ] + + tables = bq.get_all_tables('dataset') + self.assertEquals(expected_result, tables) + + def test_get_tables(self): + """Ensure _get_all_tables fetches table names from BigQuery.""" + + mock_execute = mock.Mock() + mock_execute.execute.return_value = FULL_TABLE_LIST_RESPONSE + + mock_tables = mock.Mock() + mock_tables.list.return_value = mock_execute + + mock_bq_service = mock.Mock() + mock_bq_service.tables.return_value = mock_tables + + bq = client.BigQueryClient(mock_bq_service, 'project') + expected_result = { 'appspot-3': {'2013_06_appspot_3': 1370044800}, 'appspot-2': {'2013_06_appspot_2': 1370044800}, @@ -2309,7 +2663,7 @@ def test_dataset_create_failed(self): projectId=self.project, body=self.body) self.mock_datasets.insert.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_dataset_create_success(self): """Ensure that if creating the table fails, False is returned.""" @@ -2338,7 +2692,7 @@ def test_dataset_create_success(self): projectId=self.project, body=self.body) self.mock_datasets.insert.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) class TestDeleteDataset(unittest.TestCase): @@ -2374,7 +2728,7 @@ def test_delete_datasets_fail(self): self.client.swallow_results = True self.mock_datasets.delete.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_delete_datasets_success(self): """Ensure that if deleting table succeeds, True is returned.""" @@ -2399,7 +2753,7 @@ def test_delete_datasets_success(self): deleteContents=False) self.mock_datasets.delete.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_delete_datasets_delete_contents_success(self): """Ensure that if deleting table succeeds, True is returned.""" @@ -2424,7 +2778,7 @@ def test_delete_datasets_delete_contents_success(self): deleteContents=True) self.mock_datasets.delete.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) FULL_DATASET_LIST_RESPONSE = { @@ -2584,7 +2938,7 @@ def test_dataset_update_failed(self): projectId=self.project, datasetId=self.dataset, body=self.body) self.mock_datasets.update.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) def test_dataset_update_success(self): """Ensure that if creating the table fails, False is returned.""" @@ -2592,18 +2946,18 @@ def test_dataset_update_success(self): self.mock_datasets.update.return_value.execute.side_effect = [{ 'status': 'foo'}, {'status': 'bar'}] - actual = self.client.update_dataset(self.dataset, - self.friendly_name, - self.description, - self.access) + actual = self.client.update_dataset(self.dataset, + friendly_name=self.friendly_name, + description=self.description, + access=self.access) self.assertTrue(actual) self.client.swallow_results = False - actual = self.client.update_dataset(self.dataset, - self.friendly_name, - self.description, - self.access) + actual = self.client.update_dataset(self.dataset, + friendly_name=self.friendly_name, + description=self.description, + access=self.access) self.assertEqual(actual, {'status': 'bar'}) @@ -2613,4 +2967,219 @@ def test_dataset_update_success(self): projectId=self.project, datasetId=self.dataset, body=self.body) self.mock_datasets.update.return_value.execute. \ - assert_called_with() + assert_called_with(num_retries=0) + + +class TestNumRetries(unittest.TestCase): + + def setUp(self): + client._bq_client = None + + self.mock_bq_service = mock.Mock() + self.mock_tables = mock.Mock() + self.mock_job_collection = mock.Mock() + self.mock_datasets = mock.Mock() + self.mock_table_data = mock.Mock() + self.mock_bq_service.tables.return_value = self.mock_tables + self.mock_bq_service.jobs.return_value = self.mock_job_collection + self.mock_bq_service.datasets.return_value = self.mock_datasets + self.mock_bq_service.tabledata.return_value = self.mock_table_data + + self.project_id = 'project' + self.num_retries = 5 + self.client = client.BigQueryClient(self.mock_bq_service, + self.project_id, + num_retries=self.num_retries) + self.dataset = 'dataset' + self.project = 'project' + self.table = 'table' + self.schema = [ + {'name': 'foo', 'type': 'STRING', 'mode': 'nullable'}, + {'name': 'bar', 'type': 'FLOAT', 'mode': 'nullable'} + ] + self.friendly_name = "friendly name" + self.description = "description" + self.access = [{'userByEmail': "bob@gmail.com"}] + self.query = 'SELECT "bar" foo, "foo" bar' + self.rows = [ + {'one': 'uno', 'two': 'dos'}, {'one': 'ein', 'two': 'zwei'}, + {'two': 'kiwi'}] + self.data = { + "kind": "bigquery#tableDataInsertAllRequest", + "rows": [{'insertId': "uno", 'json': {'one': 'uno', 'two': 'dos'}}, + {'insertId': "ein", 'json': + {'one': 'ein', 'two': 'zwei'}}, + {'json': {'two': 'kiwi'}}] + } + + def test_get_response(self): + job_id = 'bar' + + mock_query_job = mock.Mock() + mock_query_reply = mock.Mock() + mock_query_job.execute.return_value = mock_query_reply + self.mock_job_collection.getQueryResults.return_value = mock_query_job + + offset = 5 + limit = 10 + page_token = "token" + timeout = 1 + + self.client.get_query_results(job_id, offset, limit, + page_token, timeout) + + mock_query_job.execute. \ + assert_called_once_with(num_retries=self.num_retries) + + def test_table_exists(self): + expected = [ + {'type': 'FLOAT', 'name': 'foo', 'mode': 'NULLABLE'}, + {'type': 'INTEGER', 'name': 'bar', 'mode': 'NULLABLE'}, + {'type': 'INTEGER', 'name': 'baz', 'mode': 'NULLABLE'}, + ] + + self.mock_tables.get.return_value.execute.return_value = \ + {'schema': {'fields': expected}} + + self.client.get_table_schema(self.dataset, self.table) + self.mock_tables.get.return_value.execute. \ + assert_called_once_with(num_retries=self.num_retries) + + def test_table_create(self): + self.mock_tables.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.create_table(self.dataset, self.table, + self.schema) + + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_table_update(self): + self.mock_tables.update.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.update_table(self.dataset, self.table, + self.schema) + + self.mock_tables.update.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_table_patch(self): + self.mock_tables.patch.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + self.client.patch_table(self.dataset, self.table, + self.schema) + + self.mock_tables.patch.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_view_create(self): + body = { + 'view': {'query': self.query}, + 'tableReference': { + 'tableId': self.table, 'projectId': self.project, + 'datasetId': self.dataset + } + } + + self.mock_tables.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.create_view(self.dataset, self.table, + self.query) + + self.assertTrue(actual) + + self.mock_tables.insert.assert_called_with( + projectId=self.project, datasetId=self.dataset, body=body) + + self.mock_tables.insert.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_delete_table(self): + self.mock_tables.delete.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.delete_table(self.dataset, self.table) + + self.assertTrue(actual) + + self.mock_tables.delete.assert_called_with( + projectId=self.project, datasetId=self.dataset, tableId=self.table) + + self.mock_tables.delete.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_push(self): + self.mock_table_data.insertAll.return_value.execute.return_value = { + 'status': 'foo'} + + actual = self.client.push_rows(self.dataset, self.table, self.rows, + 'one') + + self.assertTrue(actual) + + self.mock_bq_service.tabledata.assert_called_with() + + self.mock_table_data.insertAll.assert_called_with( + projectId=self.project, datasetId=self.dataset, tableId=self.table, + body=self.data) + + execute_calls = [mock.call(num_retries=self.num_retries)] + self.mock_table_data.insertAll.return_value.execute.assert_has_calls( + execute_calls) + + def test_dataset_create(self): + body = { + 'datasetReference': { + 'datasetId': self.dataset, + 'projectId': self.project}, + 'friendlyName': self.friendly_name, + 'description': self.description, + 'access': self.access + } + + self.mock_datasets.insert.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.create_dataset(self.dataset, + self.friendly_name, + self.description, + self.access) + self.assertTrue(actual) + + self.mock_datasets.insert.assert_called_with( + projectId=self.project, body=body) + + self.mock_datasets.insert.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_delete_datasets(self): + self.mock_datasets.delete.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.delete_dataset(self.dataset) + + self.assertTrue(actual) + + self.mock_datasets.delete.assert_called_with( + projectId=self.project, datasetId=self.dataset, + deleteContents=False) + + self.mock_datasets.delete.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) + + def test_dataset_update(self): + self.mock_datasets.update.return_value.execute.side_effect = [{ + 'status': 'foo'}, {'status': 'bar'}] + + actual = self.client.update_dataset(self.dataset, + self.friendly_name, + self.description, + self.access) + self.assertTrue(actual) + + self.mock_datasets.update.return_value.execute. \ + assert_called_with(num_retries=self.num_retries) diff --git a/bigquery/tests/test_query_builder.py b/bigquery/tests/test_query_builder.py index df37a3e..6e9e9ee 100644 --- a/bigquery/tests/test_query_builder.py +++ b/bigquery/tests/test_query_builder.py @@ -340,6 +340,27 @@ def test_no_fields(self): self.assertEqual(result, "") +class TestLimit(unittest.TestCase): + + def test_with_limit(self): + """Ensure that render limit works.""" + from bigquery.query_builder \ + import _render_limit + + result = _render_limit(8) + + self.assertEqual(result, "LIMIT 8") + + def test_no_fields(self): + """Ensure that render limit can work without any arguments.""" + from bigquery.query_builder \ + import _render_limit + + result = _render_limit(None) + + self.assertEqual(result, "") + + class TestRenderQuery(unittest.TestCase): def test_full_query(self): @@ -392,14 +413,15 @@ def test_full_query(self): 'type': 'INTEGER' } ], - order_by={'fields': ['timestamp'], 'direction': 'desc'}) + order_by={'fields': ['timestamp'], 'direction': 'desc'}, + limit=10) expected_query = ("SELECT status as status, start_time as timestamp, " "resource as url FROM [dataset.2013_06_appspot_1]" " WHERE (start_time <= INTEGER('1371566954')) AND " "(start_time >= INTEGER('1371556954')) GROUP BY " "timestamp, status HAVING (status == INTEGER('1')) " - "ORDER BY timestamp desc") + "ORDER BY timestamp desc LIMIT 10") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -427,7 +449,7 @@ def test_empty_conditions(self): expected_query = ("SELECT status as status, start_time as timestamp, " "resource as url FROM " "[dataset.2013_06_appspot_1] ORDER BY " - "timestamp desc") + "timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -464,7 +486,7 @@ def test_incorrect_conditions(self): expected_query = ("SELECT status as status, start_time as timestamp, " "resource as url FROM " "[dataset.2013_06_appspot_1] ORDER BY " - "timestamp desc") + "timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -516,7 +538,7 @@ def test_multiple_condition_values(self): "INTEGER('1371556954')) AND " "((resource CONTAINS STRING('foo') AND resource " "CONTAINS STRING('baz')) AND (NOT resource CONTAINS " - "STRING('bar'))) ORDER BY timestamp desc") + "STRING('bar'))) ORDER BY timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -550,7 +572,7 @@ def test_negated_condition_value(self): expected_query = ("SELECT status as status, start_time as timestamp, " "resource as url FROM " "[dataset.2013_06_appspot_1] WHERE (NOT resource " - "CONTAINS STRING('foo')) ORDER BY timestamp desc") + "CONTAINS STRING('foo')) ORDER BY timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -593,7 +615,7 @@ def test_multiple_negated_condition_values(self): "[dataset.2013_06_appspot_1] WHERE (NOT resource " "CONTAINS STRING('foo') AND NOT resource CONTAINS " "STRING('baz') AND NOT resource CONTAINS " - "STRING('bar')) ORDER BY timestamp desc") + "STRING('bar')) ORDER BY timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -631,7 +653,7 @@ def test_empty_order(self): "resource as url FROM " "[dataset.2013_06_appspot_1] WHERE (start_time " "<= INTEGER('1371566954')) AND (start_time >= " - "INTEGER('1371556954')) ") + "INTEGER('1371556954')) ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -669,7 +691,7 @@ def test_incorrect_order(self): "resource as url FROM " "[dataset.2013_06_appspot_1] WHERE (start_time " "<= INTEGER('1371566954')) AND (start_time >= " - "INTEGER('1371556954')) ") + "INTEGER('1371556954')) ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -702,7 +724,7 @@ def test_empty_select(self): expected_query = ("SELECT * FROM [dataset.2013_06_appspot_1] " "WHERE (start_time <= INTEGER('1371566954')) AND " "(start_time >= INTEGER('1371556954')) ORDER BY " - "timestamp desc") + "timestamp desc ") self.assertEqual(result, expected_query) def test_no_alias(self): @@ -732,7 +754,7 @@ def test_no_alias(self): expected_query = ("SELECT status , start_time , resource FROM " "[dataset.2013_06_appspot_1] WHERE (start_time " "<= INTEGER('1371566954')) AND (start_time >= " - "INTEGER('1371556954')) ORDER BY start_time desc") + "INTEGER('1371556954')) ORDER BY start_time desc ") expected_select = (field.strip() for field in expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) @@ -777,7 +799,7 @@ def test_formatting(self): "resource as url FROM " "[dataset.2013_06_appspot_1] WHERE (start_time " "<= INTEGER('1371566954')) AND (start_time >= " - "INTEGER('1371556954')) ORDER BY timestamp desc") + "INTEGER('1371556954')) ORDER BY timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -830,7 +852,7 @@ def test_formatting_duplicate_columns(self): "[dataset.2013_06_appspot_1] WHERE " "(start_time <= INTEGER('1371566954')) AND " "(start_time >= INTEGER('1371556954')) ORDER BY " - "timestamp desc") + "timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -874,7 +896,7 @@ def test_sec_to_micro_formatting(self): "timestamp, resource as url FROM " "[dataset.2013_06_appspot_1] WHERE (start_time " "<= INTEGER('1371566954')) AND (start_time >= " - "INTEGER('1371556954')) ORDER BY timestamp desc") + "INTEGER('1371556954')) ORDER BY timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -908,7 +930,8 @@ def test_no_table_or_dataset(self): 'negate': False}], 'type': 'INTEGER'}, ], - order_by={'fields': ['timestamp'], 'direction': 'desc'}) + order_by={'fields': ['timestamp'], 'direction': 'desc'}, + limit=10) self.assertIsNone(result) @@ -930,7 +953,7 @@ def test_empty_groupings(self): expected_query = ("SELECT status as status, start_time as timestamp, " "resource as url FROM " "[dataset.2013_06_appspot_1] ORDER BY " - "timestamp desc") + "timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] @@ -971,7 +994,7 @@ def test_multi_tables(self): "[dataset.2013_07_appspot_1] WHERE (start_time " "<= INTEGER('1371566954')) AND (start_time >= " "INTEGER('1371556954')) GROUP BY timestamp, status " - "ORDER BY timestamp desc") + "ORDER BY timestamp desc ") expected_select = (expected_query[len('SELECT '):] .split('FROM')[0].strip().split(', ')) expected_from = expected_query[len('SELECT '):].split('FROM')[1] diff --git a/bigquery/version.py b/bigquery/version.py new file mode 100644 index 0000000..1c19d78 --- /dev/null +++ b/bigquery/version.py @@ -0,0 +1 @@ +__version__ = '1.15.0' diff --git a/requirements_dev.txt b/requirements_dev.txt index 74162c3..1040dea 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,6 +1,6 @@ nose rednose -mock==1.0.1 +mock==4.0.2 coverage nose-exclude tox diff --git a/setup.py b/setup.py index b0c737b..fc1c5de 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,17 @@ +from distutils.util import convert_path from setuptools import find_packages from setuptools import setup -VERSION = '1.6.0' +ns = {} +version_path = convert_path('bigquery/version.py') +with open(version_path) as version_file: + exec(version_file.read(), ns) setup_args = dict( name='BigQuery-Python', description='Simple Python client for interacting with Google BigQuery.', url='https://github.com/tylertreat/BigQuery-Python', - version=VERSION, + version=ns['__version__'], license='Apache', packages=find_packages(), include_package_data=True, diff --git a/tox.ini b/tox.ini index ce76190..58dadc9 100644 --- a/tox.ini +++ b/tox.ini @@ -4,7 +4,7 @@ # and then run "tox" from this directory. [tox] -envlist = py27, py33, py34, nightly, pypy +envlist = py27, py35, py36, nightly, pypy [testenv] commands = nosetests --logging-level=ERROR -a slow --with-coverage --cover-package=bigquery