接口篇 – Python连接Greenplum

上篇文章介绍了Golang连接Greenplum,那么本文章来介绍在这几年火的一塌糊涂的Python语言如何连接Greenplum。

Python连接Greenplum数据库较常用的库有PyGreSQL和Psycopg2两个。Greenplum的很多脚本都是采用PyGreSQL为基础开发的,可见PyGreSQL肯定有其独到之处,但是Psycopg2这几年似乎在Postgres体系中更加流行。本文将会分别介绍这两个库的使用。

PyGreSQL

PyGreSQL是连接PostgreSQL的Python库,目前最新版本为PyGreSQL 5.1,支持PostgreSQL 9.0到11版本,可以对应到Greenplum 6.x的版本,如果要支持Greenplum 4.x和5.x版本,可以选用PyGreSQL 4.x版本。

安装

pip install PyGreSQL

示例

#!/usr/bin/env python

import pg


def operate_postgre_tbl_product():
    try:
        #pgdb_conn = pg.connect(dbname = 'tpc', host = '192.168.103.31', user = 'gpadmin', passwd = '')
        pgdb_conn = pg.connect("host=192.168.103.31 port=5432 dbname=tpc user=gpadmin")

    except Exception, e:
         print e.args[0]
         return


    sql_desc = "select * from call_center limit 5;"
    for row in pgdb_conn.query(sql_desc).dictresult():
        print row


    pgdb_conn.close()


if __name__ == '__main__':
    operate_postgre_tbl_product()

参考文章

http://www.pygresql.org/about.html

Psycopg2

Psycopg2库的底层是由C语言封装PostgreSQL的标准库C接口库libpq实现的,运行速度非常快,它支持大型多线程应用的大量并发Insert和Update操作,另外它完全兼容DB API 2.0。

安装

pip install psycopg2

示例

简单的增加,查询记录

import psycopg2
import psycopg2.extras
import time
 
'''
    连接数据库
    returns:db
'''
def gp_connect():
    try:
        db = psycopg2.connect(dbname="testdb",
                              user="gpadmin",
                              password="gpadmin",
                              host="10.1.208.42",
                              port="5432")
        # connect()也可以使用一个大的字符串参数,
        # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
        return db
    except psycopg2.DatabaseError as e:
        print("could not connect to Greenplum server",e)
 
 
if __name__ == '__main__':
    conn = gp_connect()
    print(conn)
    cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
    ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
    conn.commit()
    # 提交到数据库中
    print(ret)
    ret = cur.execute("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);",(300, "abc'def"))
 
    conn.commit()
    # 提交到数据库中
    print(cur.rowcount)  # 1
    # 返回数据库中的行的总数已修改,插入或删除最后 execute*().
 
    ret_sql = cur.mogrify("select * from pg_tables where tablename = %s;", ('gp_test',))
    # 返回生成的sql脚本, 用以查看生成的sql是否正确.
    # sql脚本必须以;结尾, 不可以省略.其次, 不管sql中有几个参数, 都需要用 % s代替, 只有 % s, 不管值是字符还是数字, 一律 % s.
    # 最后, 第二个参数中, 一定要传入元组, 哪怕只有一个元素, 像我刚才的例子一样, ('gp_test')这样是不行的.
    print(ret_sql.decode('utf-8'))  # select * from pg_tables where tablename = E'gp_test';
 
    cur.execute("select * from gp_test where num = %s;", (300,))
    pg_obj = cur.fetchone()
    print(pg_obj) # {'id': 1, 'num': 300, 'data': "abc'def"}
 
    conn.close() # 关闭连接

批量插入,查询

conn = gp_connect()
print(conn)
cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
# # 这里创建的是一个字典Cursor, 这样返回的数据, 都是字典的形式, 方便使用
# ret = cur.execute("CREATE TABLE public.gp_test (id serial PRIMARY KEY, num integer, data varchar);")
# conn.commit()
# # 提交到数据库中
# print(ret)
gp_list = []
for i in range(200):
    gp_list.append((i,'abc%s'%i))
# print(gp_list)
# 批量提交数据
ret = cur.executemany("INSERT INTO public.gp_test (num, data) VALUES (%s, %s);", gp_list)
conn.commit()
# 提交到数据库中
print(cur.query)  # 查看上一条执行的脚本
print(cur.rowcount)  # 200
# 返回数据库中的行的总数已修改,插入或删除最后 execute*().
cur.execute("select  count(*) num from gp_test")
pg_obj = cur.fetchone()
print(pg_obj)  # {'num': 200}
 
conn.close()  # 关闭连接

使用连接池,执行高性能的批量插入与查询

import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime
 
'''
    连接数据库
    使用数据库连接池
    returns:db
'''
def gp_connect():
    try:
        simple_conn_pool = psycopg2.pool.SimpleConnectionPool(minconn=1, maxconn=5,dbname="testdb",
                              user="gpadmin",
                              password="gpadmin",
                              host="10.1.208.42",
                              port="5432")
        # connect()也可以使用一个大的字符串参数,
        # 比如”host=localhost port=5432 user=postgres password=postgres dbname=test”
        # 从数据库连接池获取连接
        conn = simple_conn_pool.getconn()
        return conn
    except psycopg2.DatabaseError as e:
        print("could not connect to Greenplum server",e)
 
 
if __name__ == '__main__':
    conn = gp_connect()
    print(conn)
    cur = conn.cursor()
    # 批量查询大小
    batch_size = 1000
    gp_list = []
    for i in range(2000, 100000):
        gp_list.append((i,'abc%s'%i))
    # print(gp_list)
 
    # 开始时间
    start_time = datetime.now()
    # 批量提交数据execute_values性能大于executemany
    psycopg2.extras.execute_values(cur, "INSERT INTO public.gp_test (num, data) VALUES %s", gp_list)
    conn.commit()
    # 提交到数据库中
    cur.execute("select  *  from gp_test order by id")
    count = 0
 
    while True:
        count = count + 1
        # 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
        data = cur.fetchmany(batch_size)
        # 数据为空的时候中断循环
        if not data:
            break
        else:
            print(data[-1])  # 得到最后一条(通过元祖方式返回)
        print('获取%s到%s数据成功' % ((count - 1) * batch_size, count * batch_size))
    print('insert到fetchmany获取全量数据所用时间:', (datetime.now() - start_time).seconds) # 16s
conn.close()  # 关闭连接

执行高性能的批量更新与查询

import psycopg2
import psycopg2.extras
import psycopg2.pool
from datetime import datetime

'''
    连接数据库
    使用数据库连接池
    returns:db
'''
def gp_connect():
    ……略

if __name__ == '__main__':
    conn = gp_connect()
    print(conn)
    cur = conn.cursor()
    # 批量查询大小
    batch_size = 1000
    gp_uplist = [] # 更新列表
    for i in range(2000, 10000):
        gp_uplist.append((i,'def%s'%i))
    print(gp_uplist)

    # 开始时间
    start_time = datetime.now()
    # 批量提交数据execute_values性能大于executemany

    sql = "UPDATE public.gp_test SET data = TEST.data  " \
          "FROM (VALUES %s) AS TEST(num, data) " \
          "WHERE public.gp_test.num = TEST.num"
    # 批量更新语句模版 UPDATE TABLE SET TABLE.COL = XX.col
    # FROM (VALUES %s) AS XX(id_col,col)
    # WHERE TABLE.id_col = XX.id_col 
    # XX为别名
    psycopg2.extras.execute_values(cur, sql, gp_uplist, page_size=100)
    print(cur.query)
    conn.commit()
    # 提交到数据库中
    cur.execute("select  *  from gp_test order by id")
    count = 0

    while True:
        count = count + 1
        # 每次获取时会从上次游标的位置开始移动size个位置,返回size条数据
        data = cur.fetchmany(batch_size)
        # 数据为空的时候中断循环
        if not data:
            break
        else:
            print(data[-1])  # 得到最后一条(通过元祖方式返回)
        print('获取%s到%s数据成功' % ((count - 1) * batch_size, count * batch_size))
    print('update到fetchmany获取全量数据所用时间:', (datetime.now() - start_time).seconds) # 16s
conn.close()  # 关闭连接

使用服务端游标

#逐条处理
with psycopg2.connect(database_connection_string) as conn:
    with conn.cursor(name='name_of_cursor') as cursor:

        cursor.itersize = 20000

        query = "SELECT * FROM ..."
        cursor.execute(query)

        for row in cursor:
         # process row

#2 一次处理多条
while True:
    rows = cursor.fetchmany(100)
    if len(rows) > 0:
        for row in rows:
            # process row
    else:
        break

参考文章

http://initd.org/psycopg/docs/index.html

https://www.cnblogs.com/xiao-apple36/p/10362367.html

END~

发表评论

电子邮件地址不会被公开。 必填项已用*标注