3

I'm trying to extract events from MixPanel, process it and then upload to BigQuery table(Creating a new table).

I googled for all the available resources but not useful in solving the issue.

Below is my code,
# Required modules import
import os
from mixpanel_api import Mixpanel
import collections
import json
from google.cloud import storage, bigquery
# Function to flatten exported file
def flatten(d, parent_key='', sep=''):
    items = []
    for k, v in d.items():
        new_key = parent_key.replace("PROPERTIES","").replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'","") + sep + k.replace(" ","").replace("_","").replace("$","").replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'","") if parent_key else k
        #new_key = parent_key.replace("PROPERTIES","").join(e for e in parent_key if e.isalnum()) + sep + k.join(e for e in k if e.isalnum()) if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key.upper(), sep=sep).items())
        else:
            items.append((new_key.upper().replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'",""), v))
            #items.append((new_key.upper().join(e for e in new_key if e.isalnum()), v))
            #items.append(("ID","1"))
            #items.append(("PROCESS_DATE",""))
            #items.append(("DATA_DATE",""))
    return dict(items)
# Start of execution point
if __name__ == '__main__':

   # Secret and token to access API
   api_sec = 'aa8af6b5ca5a5ed30e20f3af0acdfb2d'
   api_tok = 'ad5234953e64b908bcd35388875324db'
   # User input for date range and filename
   start_date = str(input('Enter the start date(format: YYYY-MM-DD): '))
   end_date = str(input('Enter the end date(format: YYYY-MM-DD): '))
   file_name = str(input('Enter filename to store output: '))
   file_formatter = str(input('Enter filename to store formatted output: '))
   # Instantiating Mixpanel object
   mpo = Mixpanel(api_sec,
           api_tok
         )
   # Exporting events for the specified date range and storing in the filename provided, gunzip'ed file
   mpo.export_events(file_name,
           {'from_date':start_date,
            'to_date':end_date
           },
           add_gzip_header=False,
           raw_stream=True
        )

   # Dict for schema derived from file
   schema_dict = {}
   # Flatten file and write-out to another file
   with open(file_name, 'r') as uf, open(file_formatter, 'a') as ff, open('schema_file', 'a') as sf:
       #schema_list = []
       for line in uf:
           temp = flatten(json.loads(line))
           for k in temp.keys():
              if k not in schema_dict:
                   schema_dict[k] = "STRING"
                   #schema_list.append({"name" : k, "type" : "STRING"})
            #ff.write(json.dumps(temp))
           json.dump(temp, ff, indent = None, sort_keys = True)                # Dumps each dictionary entry as a newline entry, even '{' '}' is on new lines
           ff.write('\n')                     # Adds a new line after each object dump to file
       #json.dump(schema_dict, sf, indent = None, sort_keys = True)
       #json.dump(schema_list, sf, indent = None, sort_keys = True)

   # Removing source file
   if os.path.isfile(file_name):
       sfr = os.remove(file_name)
       if sfr == None:
           print 'File ' +file_name+ ' removed from local storage'
       else:
           print 'File ' +file_name+ ' remove failed from local storage'
   # Uploading file to Google bucket
   client = storage.Client()
   bucket = client.get_bucket('yathin-sample-bucket')
   blob = bucket.blob(file_formatter)
   status = blob.upload_from_filename(file_formatter)
   if status == None:
       print 'File ' +file_formatter+ ' upload success. Removing local copy.'
       fr = os.remove(file_formatter)
       if fr == None:
           print 'File ' +file_formatter+ ' removed from local storage'
       else:
           print 'File ' +file_formatter+ ' remove failed from local storage'
   # Loading file to BigQuery
   client = bigquery.Client()
   dataset_id = 'sample_dataset'
   dataset_ref = client.dataset(dataset_id)
   job_config = bigquery.LoadJobConfig()
   job_config.schema = [ bigquery.SchemaField(k,v) for k,v in schema_dict.items() ]
   #job_config.autodetect = True
   #job_config.create_dsiposition = 'CREATE_IF_NEEDED'
   #job_config.write_disposition = 'WRITE_APPEND'
   job_config.source_format = 'NEWLINE_DELIMITED_JSON'
   uri = 'gs://yathin-sample-bucket/'+file_formatter
   load_job = client.load_table_from_uri(
              uri,
              dataset_ref.table('test_json'),
              job_config=job_config)  # API request
   #assert load_job.job_type == 'load'
   #load_job.result()  # Waits for table load to complete.

This code isn't returning any error, but table isn't getting created.

Can someone please help here by pointing out what is wrong.

8
  • Try bq ls -j --all to list current and past jobs. It looks like you are initiating the job, but then not waiting for it to complete or checking the result. Commented May 28, 2018 at 13:18
  • 1
    after running load_job.result(), is there any error message in load_job.errors ? Commented May 28, 2018 at 18:24
  • @ElliottBrossard Thanks, there were errors in parsing the JSON file. I was able to fix and now it's working fine:) Commented May 30, 2018 at 6:13
  • That's good. If you changed your code to be able to detect the error, consider posting it as an answer for the benefit of future readers. Commented May 30, 2018 at 12:41
  • @ElliottBrossard I will post the code once it's totally complete. Few more things, How to check if a table is already present or not? How to use a jobid to submit the above load job and track about it succeeded or failed? Any Python code sample, if available would be of great help for me. Commented May 31, 2018 at 5:58

1 Answer 1

1

It's possible that there is an error, but youre not returning the results in your script. I am not sure why you commented out load_job.result() but that is probably necessary to make sure the job completes.

If there still isn't an error this script can give you a list of your last jobs and the result with any error codes. Just change the max_results kwarg.

client = biquery.Client()
for job in client.list_jobs(max_results=1, all_users=False):
        jobid = job.job_id
        job = client.get_job(jobid)
        print("------BIG QUERY JOB ERROR REASON", job.errors)

Also, Per your question in the comments about how to check to see if a table exists...

from google.cloud.exceptions import NotFound
client = bigquery.Client()
    try:
        dataset = client.dataset('DatasetName')
        table_ref = dataset.table('TableName')
        client.get_table(table_ref)
    except: NotFound:
        print('Table Not Found')
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.