读书笔记 - python爬虫开发与项目实战 - 1.4 进程和线程

1.4 进程和线程

1.4.1 多进程

2. 使用multiprocessing模块创建多进程

import os
from multiprocessing import Process
# 子进程要执行的代码
def run_proc(name):
    print("Children process {} ({}) Running".format(name, os.getpid()))

if __name__ == "__main__":
    print("Parent process {}.".format(os.getpid()))
    
    for i in range(5):
        p = Process(target=run_proc, kwargs={"name": str(i)})
        print("Process will start.")
        p.start()
        
    p.join()
    print("Process end.")
Parent process 8072.
Process will start.
Process will start.
Process will start.
Process will start.
Process will start.
Process end.
%xmode Verbose
%pdb on
%debug
ERROR:root:No traceback has been produced, nothing to debug.


Automatic pdb calling has been turned ON

3. multiprocessing模块提供了一个Pool类来代表进程池对象

import os
import time
import random
from multiprocessing import Pool

def run_task(name):
    print("Task {} (pid = {}) is running...".format(name, os.getpid()))
    time.sleep(random.random() * 3)
    print("Task {} end.".format(os.getpid()))
    
if __name__ == "__main__":
    print("Current process {}.".format(os.getpid()))
    p = Pool(processes=3)
    for i in range(5):
        p.apply_async(func=run_task, kwds={"name": i})
    print("Waiting for all subprocesses done...")
    p.close()
    p.join()
    print("All subprocesses done.")
Current process 4040.
Waiting for all subprocesses done...

4. 进程间通信

  • Queue
import os
import time
import random

from multiprocessing import Process
from multiprocessing import Queue

# 写数据进程执行的代码
def proc_write(q, urls):
    print("Process ({}) is writing ...".format(os.getpid()))
    for url in urls:
        q.put(url)
        print("Put {} to queue ...".format(url))
        time.sleep(random.random())

# 读数据进程执行的代码
def proc_read(q):
    print("Process ({}) is reading ...".format(os.getpid()))
    while True:
        url = q.get(True)
        print("Get {} from queue.".format(url))

if __name__ == "__main__":

    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    proc_writer1 = Process(target=proc_write, kwargs={"q": q, "urls": ["url1", "url2", "url3"]})
    proc_writer2 = Process(target=proc_write, kwargs={"q": q, "urls": ["url4", "url5", "url6"]})
    proc_reader = Process(target=proc_read, kwargs={"q": q})

    # 启动子进程proc_writer,写入
    proc_writer1.start()
    proc_writer2.start()
    # 启动子进程proc_read,读取
    proc_reader.start()
    # 等待proc_writer结束
    proc_writer1.join()
    proc_writer2.join()
    # proc_reader进程里是死循环,无法等其结束,只能强行终止
    proc_reader.terminate()

  • Pipe
import multiprocessing
import os
import random
import time

def proc_send(pipe, urls):
    for url in urls:
        print("Process ({}) send: {}.".format(os.getpid(), url))
        pipe.send(url)
        time.sleep(random.random())
        
def proc_recv(pipe):
    while True:
        print("Process ({}) rev: {}".format(os.getpid(), pipe.recv()))
        time.sleep(random.random())
        
if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc_send,
                                 kwargs={"pipe": pipe[0],
                                         "urls": ["url_" + str(i) for i in range(10)]})
    p2 = multiprocessing.Process(target=proc_recv,
                                 kwargs={"pipe": pipe[1]})
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    

1.4.2 多线程

1. 用threading模块创建多线程

import random
import time
import threading

# 新线程执行的代码
def thread_run(urls):
    print("Current {} is running ...".format(threading.current_thread().name))
    for url in urls:
        print("{} ---->>> {}".format(threading.current_thread().name, url))
        time.sleep(random.random())
    print("{} ended.".format(threading.current_thread().name))
    
if __name__ == "__main__":
    print("Current {} is running".format(threading.current_thread().name))
    t1 = threading.Thread(target=thread_run, kwargs={"urls": ["url_" + str(i) for i in range(1, 4)]})
    t2 = threading.Thread(target=thread_run, kwargs={"urls": ["url_" + str(i) for i in range(4, 7)]})
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("{} ended.".format(threading.current_thread().name))
Current MainThread is running
Current Thread-8 is running ...
Thread-8 ---->>> url_1
Current Thread-9 is running ...
Thread-9 ---->>> url_4
Thread-9 ---->>> url_5
Thread-9 ---->>> url_6
Thread-8 ---->>> url_2
Thread-9 ended.
Thread-8 ---->>> url_3
Thread-8 ended.
MainThread ended.

2. 线程同步

import threading
mylock = threading.RLock()
num = 0

class MyThread(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self, name=name)
        
    def run(self):
        global num
        while True:
            mylock.acquire()
            print("{} locked, Number: {}".format(threading.current_thread().name, num))
            num += 1
            mylock.release()            
            print("{} released, Number: {}".format(threading.current_thread().name, num))
            if num >= 4:
                break

if __name__ == "__main__":
    thread1 = MyThread("Thread_1")
    thread2 = MyThread("Thread_2")
    thread1.start()
    thread2.start()
Thread_1 locked, Number: 0
Thread_1 released, Number: 1
Thread_1 locked, Number: 1
Thread_1 released, Number: 2
Thread_2 locked, Number: 2
Thread_2 released, Number: 3
Thread_1 locked, Number: 3
Thread_1 released, Number: 4
Thread_2 locked, Number: 4
Thread_2 released, Number: 5

3. 全局解释器锁(GIL)读书笔记 - python爬虫开发与项目实战 - 1.4 进程和线程

读书笔记 - python爬虫开发与项目实战 - 1.4 进程和线程

1.4.3 协程

读书笔记 - python爬虫开发与项目实战 - 1.4 进程和线程