我如何使用Psycopg2在Redshift谱中添加分区 -

问题描述:

我们有一个基于S3数据构建的Redshift Spectrum表 - 我们试图在此表中自动添加分区 - 我可以在redshift中运行以下ALTER语句客户端或psql外壳:我如何使用Psycopg2在Redshift谱中添加分区 -

ALTER TABLE analytics_spectrum.page_view ADD PARTITION(date='2017-10-17') LOCATION 's3://data-hub/page_view/2017/10/17/'; 

但是这不能通过psycopg2执行。

sql_query = "ALTER TABLE analytics_spectrum.page_view ADD PARTITION(date='2017-10-17') LOCATION 's3://data-hub/_page_view_v3/2017/10/17/';" 
import config 
import psycopg2 
connection = psycopg2.connect(
      **config.DATABASES['redshift_db']["connection"]) 
cursor = connection.cursor() 
cursor.execute(sql_query) 

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
psycopg2.ProgrammingError: syntax error at or near "(" 
LINE 1: ...ABLE analytics_spectrum.page_view ADD PARTITION(date='201... 

在psycopg2的情况下,它甚至不会发送查询到红移和执行失败的查询分析。

现在我已经实现了使用subprocess.popen来执行alter语句 - 但我想切换回使用psycopg2。

p = subprocess.Popen(['psql', 
         '-h', self.spectrum_connection['host'], 
         '-p', self.spectrum_connection['port'], 
         '-d', self.spectrum_connection['dbname'], 
         '-U', self.spectrum_connection['user'], 
         '-c', sql_stmt], 
        env={ 
    'PGPASSWORD': self.spectrum_connection['password']}, 
    stdout=subprocess.PIPE, 
    stderr=subprocess.PIPE) 
out, err = p.communicate() 

建议/想法?

感谢, 侯赛因Bohra

+0

看一看。 https://*.com/a/47217546/3957916 –

+0

上面的文章谈到如何在Redshift Spectrum表中添加分区 - 但它没有提到有关使用python和psycopg2的任何内容。 –

我有同样的问题。查询执行不使用ISOLATION_LEVEL_AUTOCOMMIT将提高以下错误:

psycopg2.InternalError: ALTER EXTERNAL TABLE cannot run inside a transaction block 

我修改我的代码一点点,它的工作。

import argparse 
import sys, psycopg2 
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT 

input_data = {} 
input_data["db_name"] = <<DB_NAME>> 
input_data["db_host"] = <<HOST_NAME>> 
input_data["db_port"] = 5439 
input_data["db_user"] = <<USER>> 
input_data["db_pass"] = <<PASSWORD>> 
con = psycopg2.connect(dbname=input_data["db_name"], host=input_data["db_host"], port=input_data["db_port"], user=input_data["db_user"], password=input_data["db_pass"]) 
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) 
cur = con.cursor() 
query = <<ADD_YOUR_QUERY_HERE>> 
cur.execute(query) 
cur.close() 
con.close()