2

I am using psycopg2 and pandas to extract data from Postgres.

pandas.read_sql_query supports Python "generator" pattern when providing chunksize argument. It's not very helpful when working with large datasets, since the whole data is initially retrieved from DB into client-side memory and later chunked into separate frames based on chunksize. Large datasets will easily run into out-of-memory problems with this approach.

Postgres/psycopg2 are addressing this problem with server-side cursors. But Pandas does not seem to be supporting it.

Instead of doing:

iter = sql.read_sql_query(sql,
      conn,
      index_col='col1',
      chunksize=chunksize)

I tried reimplementing it like this:

from pandas.io.sql import SQLiteDatabase

curs = conn.cursor(name='cur_name') # server side cursor creation
curs.itersize = chunksize

pandas_sql = SQLiteDatabase(curs, is_cursor=True)
iter = pandas_sql.read_query(
      sql,
      index_col='col1',
      chunksize=chunksize)

but it fails because Pandas tries to access cursor.description, which is NULL for some reason with server-side cursors (and idea why?).

What's the best approach to proceed? Tnx

P.S.

2 Answers 2

2

You would need to rewrite pandas's read_query() not to use cursor.description. Simply pass a list of column names to read_query() to make it use that instead of cursor.description`:

import psycopg2
from pandas.io.sql import SQLiteDatabase, _convert_params

# modify read_query as you need and overwrite it
# added column names as argument
def read_query_modified(
    self,
    sql,
    columns,
    index_col=None,
    coerce_float=True,
    params=None,
    parse_dates=None,
    chunksize=None,
):

    args = _convert_params(sql, params)
    cursor = self.execute(*args)
    # columns = [col_desc[0] for col_desc in cursor.description]

    if chunksize is not None:
        return self._query_iterator(
            cursor,
            chunksize,
            columns,
            index_col=index_col,
            coerce_float=coerce_float,
            parse_dates=parse_dates,
        )
    else:
        data = self._fetchall_as_list(cursor)
        cursor.close()

        frame = _wrap_result(
            data,
            columns,
            index_col=index_col,
            coerce_float=coerce_float,
            parse_dates=parse_dates,
        )
        return frame

# replace read_query with your version
SQLiteDatabase.read_query = read_query_modified

chunksize = 2
conn = psycopg2.connect("dbname=mf port=5959 host=localhost user=mf_usr")
curs = conn.cursor(name='cur_name')
curs.itersize = chunksize

sql = 'select * from users where id = 366196'
columns = ['id', 'firstname', 'lastname', 'birth', 'gender', 'nationality']

pandas_sql = SQLiteDatabase(curs, is_cursor=True)
iter = pandas_sql.read_query(
      sql,
      columns,
      index_col='id',
      chunksize=chunksize)

for x in iter:
    print(x)

Output:

       firstname    lastname birth gender nationality
id
366196   Michael  Kronberger  None   None          at
Sign up to request clarification or add additional context in comments.

3 Comments

I actually tried something like this, but I am getting an exception on execute - Cursor alread closed. Any idea?
Using the code I posted ? Works fine for me fetching 500K records (chunksize = 2)
My mistake - works! I slightly improved your answer to prevent the need for passing columns
0

I slightly improved @Maurice Meyer answer to prevent the need to pass columns as an argument:

class CustomSQLiteDatabase(SQLiteDatabase):

    def read_query(
        self,
        sql,
        index_col=None,
        coerce_float=True,
        params=None,
        parse_dates=None,
        chunksize=None,
    ):

        args = _convert_params(sql, params)
        self.con.execute(*args)

        if chunksize is not None:

            columns = None

            while True:
                data = self.con.fetchmany(chunksize)

                if not columns:
                    columns = [col_desc[0] for col_desc in self.con.description]

                if type(data) == tuple:
                    data = list(data)
                if not data:
                    self.con.close()
                    break
                else:
                    yield _wrap_result(
                        data,
                        columns,
                        index_col=index_col,
                        coerce_float=coerce_float,
                        parse_dates=parse_dates,
                    )

        else:
            data = self._fetchall_as_list(self.con)
            columns = [col_desc[0] for col_desc in self.con.description]
            self.con.close()

            frame = _wrap_result(
                data,
                columns,
                index_col=index_col,
                coerce_float=coerce_float,
                parse_dates=parse_dates,
            )
            return frame

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.