diff --git a/.gitignore b/.gitignore index 6f21cde..faef403 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,5 @@ nosetests.xml .mr.developer.cfg .project .pydevproject +*.swp +*.DS_Store diff --git a/bigquery/client.py b/bigquery/client.py index f4b5d6a..9e3a9ca 100644 --- a/bigquery/client.py +++ b/bigquery/client.py @@ -8,6 +8,7 @@ from time import sleep, time import six +from apiclient.http import MediaFileUpload from bigquery.errors import (BigQueryTimeoutException, JobExecutingException, JobInsertException, UnfinishedQueryException) from googleapiclient.discovery import build, DISCOVERY_URI @@ -520,7 +521,7 @@ def get_table(self, dataset, table): return table - def create_table(self, dataset, table, schema, expiration_time=None): + def create_table(self, dataset, table, schema, expiration_time=None, partition=False): """Create a new table in the dataset. Parameters @@ -553,6 +554,11 @@ def create_table(self, dataset, table, schema, expiration_time=None): if expiration_time is not None: body['expirationTime'] = expiration_time + if partition: + body['timePartitioning'] = { + "type": "DAY" + } + try: table = self.bigquery.tables().insert( projectId=self.project_id, @@ -572,7 +578,7 @@ def create_table(self, dataset, table, schema, expiration_time=None): else: return {} - def update_table(self, dataset, table, schema): + def update_table(self, dataset, table, schema, partition=False): """Update an existing table in the dataset. Parameters @@ -600,10 +606,16 @@ def update_table(self, dataset, table, schema): } } + if partition: + body['timePartitioning'] = { + "type": "DAY" + } + try: result = self.bigquery.tables().update( projectId=self.project_id, datasetId=dataset, + tableId=table, body=body ).execute() if self.swallow_results: @@ -1037,6 +1049,86 @@ def export_data_to_uris( self._raise_insert_exception_if_error(job_resource) return job_resource + def load_data( + self, + schema_path=None, + data_path=None, + dataset_id=None, + table_id=None, + create_disposition=None, + write_disposition=None + ): + """Loads the given data file into BigQuery. + + Args: + schema_path: the path to a file containing a valid bigquery schema. + see https://cloud.google.com/bigquery/docs/reference/v2/tables + data_path: the name of the file to insert into the table. + project_id: The project id that the table exists under. This is also + assumed to be the project id this request is to be made under. + dataset_id: The dataset id of the destination table. + table_id: The table id to load data into. + """ + # Infer the data format from the name of the data file. + source_format = JOB_SOURCE_FORMAT_CSV + if data_path[-5:].lower() == '.json': + source_format = JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON + + if not create_disposition: + create_disposition = JOB_CREATE_IF_NEEDED + + if not write_disposition: + write_disposition = JOB_WRITE_APPEND + + # Post to the jobs resource using the client's media upload interface. See: + # http://developers.google.com/api-client-library/python/guide/media_upload + insert_request = self.bigquery.jobs().insert( + projectId=self.project_id, + # Provide a configuration object. See: + # https://cloud.google.com/bigquery/docs/reference/v2/jobs#resource + body={ + 'configuration': { + 'load': { + 'schema': { + 'fields': json.load(open(schema_path, 'r')) + }, + 'destinationTable': { + 'projectId': self.project_id, + 'datasetId': dataset_id, + 'tableId': table_id + }, + 'sourceFormat': source_format, + 'createDisposition': create_disposition, + 'writeDisposition': write_disposition + } + } + }, + media_body=MediaFileUpload( + data_path, + mimetype='application/octet-stream', + chunksize=1048576, + resumable=True)) + job = insert_request.execute() + + print('Waiting for job to finish...') + + status_request = self.bigquery.jobs().get( + projectId=job['jobReference']['projectId'], + jobId=job['jobReference']['jobId']) + + # Poll the job until it finishes. + while True: + result = status_request.execute(num_retries=2) + + if result['status']['state'] == 'DONE': + if result['status'].get('errors'): + raise RuntimeError('\n'.join( + e['message'] for e in result['status']['errors'])) + print('Job complete.') + return + + sleep(1) + def write_to_table( self, query, diff --git a/bigquery/schema_builder.py b/bigquery/schema_builder.py index 575b390..6393597 100644 --- a/bigquery/schema_builder.py +++ b/bigquery/schema_builder.py @@ -72,7 +72,7 @@ def describe_field(k, v, timestamp_parser=default_timestamp_parser): """ def bq_schema_field(name, bq_type, mode): - return {"name": name, "type": bq_type, "mode": mode} + return {"name": name, "type": bq_type.upper(), "mode": mode.upper()} if isinstance(v, list): if len(v) == 0: