多线程S3下载不会终止
问题描述:
我正在使用python boto和线程从S3快速下载许多文件。我在我的程序中使用了这几次,它很好用。但是,有一次它不起作用。在这一步中,我尝试在32核心机器上下载3,000个文件(Amazon EC2 cc2.8xlarge)。多线程S3下载不会终止
下面的代码实际上可以成功地下载每个文件(除了有时会出现一个httplib.IncompleteRead错误,它不会被重试所修复)。但是,32个线程中只有10个实际终止,程序只是挂起。不知道这是为什么。所有的文件已经下载完毕,所有的线程都退出了。当我下载较少的文件时,他们会采取其他步骤。我已经减少到使用单个线程下载所有这些文件(它工作但超级慢)。任何见解将不胜感激!
from boto.ec2.connection import EC2Connection
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.exception import BotoClientError
from socket import error as socket_error
from httplib import IncompleteRead
import multiprocessing
from time import sleep
import os
import Queue
import threading
def download_to_dir(keys, dir):
"""
Given a list of S3 keys and a local directory filepath,
downloads the files corresponding to the keys to the local directory.
Returns a list of filenames.
"""
filenames = [None for k in keys]
class DownloadThread(threading.Thread):
def __init__(self, queue, dir):
# call to the parent constructor
threading.Thread.__init__(self)
# create a connection to S3
connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
self.conn = connection
self.dir = dir
self.__queue = queue
def run(self):
while True:
key_dict = self.__queue.get()
print self, key_dict
if key_dict is None:
print "DOWNLOAD THREAD FINISHED"
break
elif key_dict == 'DONE': #last job for last worker
print "DOWNLOADING DONE"
break
else: #still work to do!
index = key_dict.get('idx')
key = key_dict.get('key')
bucket_name = key.bucket.name
bucket = self.conn.get_bucket(bucket_name)
k = Key(bucket) #clone key to use new connection
k.key = key.key
filename = os.path.join(dir, k.key)
#make dirs if don't exist yet
try:
f_dirname = os.path.dirname(filename)
if not os.path.exists(f_dirname):
os.makedirs(f_dirname)
except OSError: #already written to
pass
#inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10
RETRIES = 5 #attempt at most 5 times
wait = 1
for i in xrange(RETRIES):
try:
k.get_contents_to_filename(filename)
break
except (IncompleteRead, socket_error, BotoClientError), e:
if i == RETRIES-1: #failed final attempt
raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e))
break
wait *= 2
sleep(wait)
#put filename in right spot!
filenames[index] = filename
num_cores = multiprocessing.cpu_count()
q = Queue.Queue(0)
for i, k in enumerate(keys):
q.put({'idx': i, 'key':k})
for i in range(num_cores-1):
q.put(None) # add end-of-queue markers
q.put('DONE') #to signal absolute end of job
#Spin up all the workers
workers = [DownloadThread(q, dir) for i in range(num_cores)]
for worker in workers:
worker.start()
#Block main thread until completion
for worker in workers:
worker.join()
return filenames
答
升级到AWS SDK版本1.4.4.0或更高版本,或者坚持到2个线程。较旧的版本具有最多2个同时连接的limit。这意味着如果你启动2个线程,你的代码就能正常工作;如果您启动3个或更多,您肯定会看到不完整的读取和用尽超时。
你会看到,虽然2线程可以大大提高你的吞吐量,但2以上并没有太大的改变,因为你的网卡总是无时无刻不忙。
答
S3Connection使用httplib.py,并且该库不是线程安全的,因此确保每个线程都拥有自己的连接至关重要。看起来你正在这样做。
Boto已经拥有了自己的重试机制,但是您在其上层叠了一个来处理某些其他错误。我想知道是否建议在except块中创建一个新的S3Connection对象。它似乎就像底层的http连接在那个时候处于一种不寻常的状态,最好从一个新的连接开始。
只是一个想法。
谢谢@Jirka Hanika - 改为两个线程似乎解决了这个问题。虽然我认为亚马逊机器如此狂野,实际上拥有大量的下载线程确实使效率更高。我试图找到AWS SDK> 1.4.4,但亚马逊上的最新下载是1.3.13 ... – Max 2012-07-12 13:59:03
@Max - 对于大多数语言,1.4会在[这里]找到(https://github.com/amazonwebservices/ ),并希望Python会出现在[有一天](http://aws.typepad.com/aws/2012/01/big-news-regarding-python-boto-andawaws.html)。在此之前,你可能会走运,我不确定。 – 2012-07-12 15:02:38