使用Psycopg2高效更新数据(二)

python: 3.7

pscopg2: 2.7

参考文档Server side cursors

当执行一个数据库查询时,Pscopg cursor通常将查询到的所有数据返回给客户端,如果返回的数据过大,则将占用客户端大量的内存。因此,psycopg提供了一种成为server side curosr机制,每次返回可控制数量的数据。

Server side cursor是使用PostgreSQL的DECLARE命令创建,并经过MOVEFETCHCLOSE命令处理的。

Psycopg通过命名的cursors装饰server side cursor的,而命名cursor是通过对cursor()方法指定name参数而创建的。server side cursor允许用户在数据集中使用scroll()移动游标,并通过fetchone()fetchmany()方法获取数据。

  • scrollable:控制游标是否可以向后移动
  • itersize:控制每次可以获取多少条数据,默认是2000

当命名cursor尝试在commit()方法执行后获取数据或者由一个autocommit模式的connection创建命名cursor时都将导致一个错误。

使用示例:fetchone

import time
import datetime
from dateutil import tz

from psycopg2 import connect
from psycopg2.extras import RealDictCursor, execute_values


DB_HOST = 'localhost'
DB_PORT = 5432
DB_USERNAME = 'root'
DB_PASSWORD = '123456'
DB_NAME = 'test'


def date_range():
    now = datetime.datetime.utcnow()
    start_at = datetime.datetime(2018, 1, 1, 0, 0, 0, tzinfo=tz.gettz("utc"))
    end_at = now - datetime.timedelta(days=180)

    data = {
            0: [start_at, end_at]
            }
    return data


def connection():
    with connect(database=DB_NAME,
                 host=DB_HOST,
                 port=DB_PORT,
                 user=DB_USERNAME,
                 password=DB_PASSWORD) as conn:
        with conn.cursor(name='server_cursor', cursor_factory=RealDictCursor) as cur:
            data = date_range()
            for k, v in data.items():
                sql = "SELECT id, score FROM review " \
                      "WHERE create_date >= %s AND create_date < %s AND score IS NULL;"
                cur.execute(sql, (v[0], v[1]))

                for c in cur:
                    print(c)


if __name__ == '__main__':
    now = time.time()
    connection()
    print('time is %d' % (time.time() - now))

使用示例:fetchmany

import time
import datetime
from dateutil import tz

from psycopg2 import connect
from psycopg2.extras import RealDictCursor, execute_values


DB_HOST = 'localhost'
DB_PORT = 5432
DB_USERNAME = 'root'
DB_PASSWORD = '123456'
DB_NAME = 'test'


def date_range():
    now = datetime.datetime.utcnow()
    start_at = datetime.datetime(2018, 1, 1, 0, 0, 0, tzinfo=tz.gettz("utc"))
    end_at = now - datetime.timedelta(days=180)

    data = {
            0: [start_at, end_at]
            }
    return data


def connection():
    with connect(database=DB_NAME,
                 host=DB_HOST,
                 port=DB_PORT,
                 user=DB_USERNAME,
                 password=DB_PASSWORD) as conn:
        with conn.cursor(name='server_cursor', cursor_factory=RealDictCursor) as cur:
            data = date_range()
            for k, v in data.items():
                sql = "SELECT id, score FROM review " \
                      "WHERE create_date >= %s AND create_date < %s AND score IS NULL;"
                cur.execute(sql, (v[0], v[1]))

                while True:
                    rows = cur.fetchmany(2000)
                    if len(rows) > 0:
                        values = []
                        for row in rows:
                            score = 5
                            values.append((row['id'], score))
                        sql = "UPDATE review SET score=data.score FROM (VALUES %s) AS data (id, score) " \
                              "WHERE review.id = data.id;"
                        execute_values(cur, sql, values, page_size=100)
                    else:
                        break


if __name__ == '__main__':
    now = time.time()
    connection()
    print('time is %d' % (time.time() - now))

上一篇
Blaze(一):前言 Blaze(一):前言
Blaze生态系统为python用户对大数据提供了高效计算的高层接口。主要由Anaconda赞助。 应用领域 Blaze整合了包括Python的Pandas、NumPy及SQL、Mongo、Spark在内的多种技术,使用Blaze能够非常容
2018-12-04
下一篇
使用Psycopg2高效更新数据(一) 使用Psycopg2高效更新数据(一)
Python: 3.7 Psycopg: 2.7 最近要对Postgresql数据库某表中的几百万条数据进行计算并更新某字段的值,在此期间使用过协程+aiopg,7分钟更新2000条数据,速度太慢;后来查看Psycopg2文档发现了一
2018-11-30
目录