Python SQLAlchemy更新Postgres记录

问题描述:

我试图使用multiprocessing模块更新数据库(异步方式)上的一行。我的代码有一个简单的函数create_member,它在表上插入一些数据,然后创建一个可能会改变这些数据的进程。问题是,传递给async_create_member会话关闭数据库连接,以及接下来的征用我得到psycopg的错误:Python SQLAlchemy更新Postgres记录

(Interface Error) connection already closed 

下面的代码:

def create_member(self, data): 
    member = self.entity(**data) 
    self.session.add(member) 
    for name in data: 
     setattr(member, name, data[name]) 
    self.session.commit() 
    self.session.close() 
    if self.index.is_indexable: 
     Process(target=self.async_create_member, 
      args=(data, self.session)).start() 
    return member 

def async_create_member(self, data, session): 
    ok, data = self.index.create(data) 
    if ok: 

     datacopy = data.copy() 
     data.clear() 
     data['document'] = datacopy['document'] 
     data['dt_idx'] = datacopy['dt_idx'] 
     stmt = update(self.entity.__table__).where(
      self.entity.__table__.c.id_doc == datacopy['id_doc'])\ 
      .values(**data) 

     session.begin() 
     session.execute(stmt) 
     session.commit() 
     session.close() 

我可能通过创建一个解决这个问题在async_create_member新引黄,但这留下的Postgres太多idle交易:

engine = create_new_engine() 
conn = engine.connect() 
conn.execute(stmt) 
conn.close() 

什么建议立即进行删除我现在做什么?有没有办法解决第一个代码?或者我应该不断创建与create_new_engine函数的新连接?我应该使用线程还是进程?

+0

我相信如果你删除'self.session.close()'在create_member功能它应该工作 – user2097159 2014-11-14 18:52:14

+0

已经尝试过。没有成功 – user1538560 2014-11-14 19:33:08

+0

嗯,这很奇怪,你应该得到这个异常的唯一原因是因为你关闭了与服务器的连接。我也认为你不需要begin()。我假设你在session.execute(stmt)行中出现这个错误? – user2097159 2014-11-14 19:38:16

您不能在线程或进程之间重用会话。 Sessions aren't thread safe,并且会话底层的连接不会在进程间干净地继承。如果没有信息,您收到的错误消息是准确的:如果尝试在跨进程边界继承数据库后使用它,则数据库连接确实是关闭的。

在大多数情况下,您应该为multiprocessing设置中的每个进程创建一个会话。

如果您的问题符合下列条件:

  • 你正在做大量的CPU密集型处理的每个对象
  • 数据库的写入操作相比相对轻巧
  • 要使用大量的(我在8台核心机器上执行此操作)

创建一个拥有会话的单个写入程序进程并将该对象传递给该程序可能是值得的处理。下面是它通常对我的作品(注:并不意味着是可运行的代码):

import multiprocessing 
from your_database_layer import create_new_session, WhateverType 

work = multiprocessing.JoinableQueue() 

def writer(commit_every = 50): 
    global work 
    session = create_new_session() 
    counter = 0 

    while True: 
     item = work.get() 
     if item is None: 
      break 

     session.add(item) 
     counter += 1 
     if counter % commit_every == 0: 
      session.commit() 

     work.task_done() 

    # Last DB writes 
    session.commit() 

    # Mark the final None in the queue as complete 
    work.task_done() 
    return 


def very_expensive_object_creation(data): 
    global work 
    very_expensive_object = WhateverType(**data) 
    # Perform lots of computation 
    work.put(very_expensive_object) 
    return 


def main(): 
    writer_process = multiprocessing.Process(target=writer) 
    writer_process.start() 

    # Create your pool that will feed the queue here, i.e. 
    workers = multiprocessing.Pool() 
    # Dispatch lots of work to very_expensive_object_creation in parallel here 
    workers.map(very_expensive_object_creation, some_iterable_source_here) 
    # --or-- in whatever other way floats your boat, such as 
    workers.apply_async(very_expensive_object_creation, args=(some_data_1,)) 
    workers.apply_async(very_expensive_object_creation, args=(some_data_2,)) 
    # etc. 

    # Signal that we won't dispatch any more work 
    workers.close() 

    # Wait for the creation work to be done 
    workers.join() 

    # Trigger the exit condition for the writer 
    work.put(None) 

    # Wait for the queue to be emptied 
    work.join() 

    return 
+0

有一个工作者公共变量和模块级别可能导致内存泄漏,并且您正在使用它来标记'task_done()'可能卡住查询。我不知道这是一个可以接受的答案 – 2017-08-16 09:19:28