关于分布式爬虫问题的求解
使用分布式实现抓取网页的邮箱和url
代码实现平台:win10
服务器:Sever
参考资料:清华大牛尹成的爬虫视频(实现分布式交互保存)
客户端:Client
代码实现作用:从天涯上爬取邮箱和url,并一直爬取下去(2018/11/20)
Sever
import multiprocessing
import multiprocessing.managers
import random
import time
import queue
import os
import requests
import threading
#创建3个队列
url1_queue = queue.Queue()
url2_queue = queue.Queue()
email_queue = queue.Queue()
def return_url1():
return url1_queue
def return_url2():
return url2_queue
def return_email():
return email_queue
class QueueManager(multiprocessing.managers.BaseManager):
pass
def getmailfromclient():
global email
with open("email.txt", 'w', encoding="utf-8") as f:
while True:
mail = email.get(timeout=100)
f.write(mail+'\r\n')
f.flush() #实时生效
if __name__ == '__main__':
multiprocessing.freeze_support()
QueueManager.register("put_url",callable=return_url1)
QueueManager.register("get_email",callable=return_email)
QueueManager.register("get_url",callable=return_url2)
manager = QueueManager(address=("10.2.4.43",51971),authkey=b"123456")
manager.start()
task1 = manager.put_url()
email = manager.get_email()
task2 = manager.get_url()
#创建这个列表的目的是为了去除重复的Url
visitlist=["http://bbs.tianya.cn/m/post-140-393974-1.shtml",
"http://bbs.tianya.cn/m/post-140-393974-2.shtml",
"http://bbs.tianya.cn/m/post-140-393974-3.shtml",]
# task1.put("http://bbs.tianya.cn/m/post-140-393974-1.shtml")
for url in ["http://bbs.tianya.cn/m/post-140-393974-1.shtml",
"http://bbs.tianya.cn/m/post-140-393974-2.shtml",
"http://bbs.tianya.cn/m/post-140-393974-3.shtml",]:
task1.put(url)
print("task1-->正在放入:",url)
print("等待中".center(40,"="))
# saveemail = threading.Timer(5,getmailfromclient)
# saveemail.start()
for i in range(100000):
url = task2.get(timeout=100)
if url in visitlist:
pass
else:
task1.put(url)
print("task1--->正在放入:",url)
visitlist.append(url)
manager.shutdown()
Client
import multiprocessing
import multiprocessing.managers
import random
import time
import queue
import os
import requests
import re
class QueueManager(multiprocessing.managers.BaseManager):
pass
def get_every_url(data):
all_list = []
mylist1 = []
mylist2 = []
mylist1 = getallhttp(data)
if len(mylist1) > 0:
mylist2 = get_abs_url(mylist1[0], data)
all_list.extend(mylist1)
all_list.extend(mylist2)
return all_list
def get_abs_url(url, data):
try:
regex = re.compile("href=\"(.*?)\"", re.I)
httplist = regex.findall(data)
newhttplist = httplist.copy() # 深拷贝
for data in newhttplist:
if data.find("http://") != -1:
httplist.remove(data)
if data.find("javascript") != -1:
httplist.remove(data)
hostname = gethostname(url)
if hostname is not None:
for i in range(len(httplist)):
httplist[i] = hostname + httplist[i]
return httplist
except:
return []
def gethostname(httpstr):
try:
mailregax = re.compile(r"(http://\S.*?)/", re.I)
mylist = mailregax.findall(httpstr)
if len(mylist) == 0:
return None
else:
return mylist[0]
except:
return None
def getallhttp(data):
try:
mailregex = re.compile(r"(http://\S*?)[\"|>|)]", re.I)
mylist = mailregex.findall(data)
return mylist
except:
return []
def getallemail(data):
try:
mailregex = re.compile(r"([A-Z0-9._%+-][email protected][A-Z0-9.-]+\.[A-Z]{2,4})", re.I)
mylist = mailregex.findall(data)
return mylist
except:
return []
def get_data(url):
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.26 Safari/537.36 Core/1.63.6735.400 QQBrowser/10.2.2614.400'}
data = requests.get(url, timeout=3,headers=headers)
return data.text
except:
return ""
if __name__ == '__main__':
QueueManager.register("put_url")
QueueManager.register("get_email")
QueueManager.register("get_url")
manager = QueueManager(address=("10.2.4.43",51971),authkey=b"123456")
manager.connect()
task1 = manager.put_url()
email = manager.get_email()
task2 = manager.get_url()
for i in range(1000):
time.sleep(1)
url = task1.get() #从服务器抓取一个链接
pagedata = get_data(url)
emaillist = getallemail(pagedata)
print('正在抓取:',url)
if len(emaillist) != 0 :
for mail in emaillist:
print(mail)
email.put(mail) #返回结果到邮件队列
urllist = get_every_url(pagedata) # 提取页面的链接,压入url队列
if len(urllist) != 0:
for myurl in urllist: # 取出url,压入队列
task2.put(myurl)
如上的代码的模型如下
- 先给Sever3个url,放入task1对应的队列(后面就直接称为task1队列)
- 随后Client从task1中取出url,并请求这些url,得到网页上的url和email
- 随后Client将email放入email队列,url放入task2队列
- 由于可能会有重复的url,所以在客户端创建一个列表visitlist,Sever从task2中取出一个Url,如果列表中有了这个url,就pass;如果没有,就将url压入task1队列(这样子就实现了task1队列与task2队列的交互),并将这个url加入列表中,方便之后的去重
还有一点,我们的代码只是实现了对Url的操作,因为这是单线程的代码,如果我们在代码中直接进行对email的存储,会造成阻塞,所以我们需要再次开启一个线程,来保存email
注意:
代码中去除重复的操作是放在Sever的,因为当有许多的Client执行时,提取的url都要交给Sever,所以在Sever中处理,是最有效,也是最正确的
我犯的错误:
- 创建队列在服务器端,客户端不需要创建
- Sever的启动使用manager.start(),Client()使用manager.connect()连接到Sever
上面的错误主要是我直接将Sever的代码复制到了Client,以至于没有发现这些错误浪费了许多的时间
我的困惑:
- 对于Sever和Client的for循环操作,表示不能理解,它们是一种固定格式还是有一定原理,我表示很困惑
- 对于Client:它的for循环是让一个客户端循环执行1000次,还是让最多1000个客户端执行代码
- 对于Sever:不懂
- 在一开始,我给Sever的第二个for循环设置的次数是1000,结果很快Sever就停止了,后来设置到了100000,才能持续执行下去。对此,我的理解:
- 可能是task1队列已经满了(这样子的话,1000就代表了队列的最大长度),以至于Sever强行关闭队列