1

I want to write a results of a BigQuery query, stored in txt file, into BigQuery table. I'm passing query text as a variable into below function, but get the following error:

error_info=method + ' ' + url) google.cloud.exceptions.BadRequest: 400 Required parameter is missing (POST https://www.googleapis.com/bigquery/v2/projects/myproject/jobs)

What am I missing?

The function:

from google.cloud import bigquery
import uuid

def async_query(query, dataset_id, dest_table, project_Id):


    # configuration json
    query_data = {
        "configuration": {
            "query": {
                "query": query,
                "defaultDataset": dataset_id,
                "allowLargeResults": True,
                "destinationTable": {
                    "projectId": project_Id,
                    "datasetId": dataset_id,
                    "tableId": dest_table
                    },
                "createDisposition": 'CREATE_IF_NEEDED',
                "writeDisposition": 'WRITE_TRUNCATE'
          }
        }
    }

    client = bigquery.Client()
    query_job = client.run_async_query(str(uuid.uuid4()), query_data)
    query_job.use_legacy_sql = False
    query_job.begin()
    wait_for_job(query_job)

    # Drain the query results by requesting a page at a time.
    query_results = query_job.results()
    page_token = None

    while True:
        rows, total_rows, page_token = query_results.fetch_data(
            max_results=10,
            page_token=page_token)

        for row in rows:
            print(row)

        if not page_token:
            break
def wait_for_job(job):
    while True:
        job.reload()  # Refreshes the state via a GET request.
        if job.state == 'DONE':
            if job.error_result:
                raise RuntimeError(job.errors)
            return
        time.sleep(1)

3 Answers 3

1
  1. You can fix defaultDataset in your configuration as below

    # configuration json
    query_data = {
        "configuration": {
            "query": {
                "query": query,
                "defaultDataset": {
                    "projectId": project_Id,
                    "datasetId": dataset_id
                    },
                "allowLargeResults": True,
                "destinationTable": {
                    "projectId": project_Id,
                    "datasetId": dataset_id,
                    "tableId": dest_table
                    },
                "createDisposition": 'CREATE_IF_NEEDED',
                "writeDisposition": 'WRITE_TRUNCATE'
          }
        }
    }
    

Note: "projectId": project_Id is optional in defaultDataset

  1. the whole defaultDataset is also optional and in your case you can just omit it as in

    # configuration json
    query_data = {
        "configuration": {
            "query": {
                "query": query,
                "allowLargeResults": True,
                "destinationTable": {
                    "projectId": project_Id,
                    "datasetId": dataset_id,
                    "tableId": dest_table
                    },
                "createDisposition": 'CREATE_IF_NEEDED',
                "writeDisposition": 'WRITE_TRUNCATE'
          }
        }
    }
    

see more in configuration.query.defaultDataset

Sign up to request clarification or add additional context in comments.

Comments

1

I have got it to work with the following solution of adding the destination table as a bigquery.Table in the queryjob

from google.cloud import bigquery
import uuid

def async_query(query, dataset_id, dest_table, project_Id):
    client = bigquery.Client()
    query_job = client.run_async_query(str(uuid.uuid4()), query)
    query_job.use_legacy_sql = False
    dataset = bigquery.Dataset(dataset_id, client)
    table = bigquery.Table(dest_table, dataset)
    query_job.destination = table
    query_job.write_disposition = 'WRITE_TRUNCATE'
    query_job.begin()
    wait_for_job(query_job)

    # Drain the query results by requesting a page at a time.
    query_results = query_job.results()
    page_token = None

    while True:
        rows, total_rows, page_token = query_results.fetch_data(
            max_results=10,
            page_token=page_token)

        for row in rows:
            print(row)

        if not page_token:
            break

def wait_for_job(job):
    while True:
        job.reload()  # Refreshes the state via a GET request.

Comments

0

I think it's that defaultDataset is actually an object, so you'll need to set the datasetId field inside (this is probably what is triggering the error).

Can you try correcting that and see if it helps? You can see the complete options for the query API in the documentation.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.