那些年,我爬过的北科(三)——爬虫进阶之多进程的使用
2018-12-13 10:31:30

爬取多个页面

在爬虫基础之环境搭建与入门中,介绍了如何用Requests下载(爬取)了一个页面,并用BeautifulSoup这个HTML解析库来解析页面里面我们想要的内容。

显然,爬虫肯定不是只让我们爬取一个网页的,这样的工作,人也可以做。下面我们来看:http://nladuo.cn/scce_site/这个页面。这个页面一共有10页,点击下一页之后可以看到在网页的url中多了个字段“2.html”,也就是当前页面时第二页的意思。

也就是我们如果要爬取下所有的新闻,只要爬取形如”http://nladuo.cn/scce_site/{page}.html“的页面就好了。

这里使用一个for循环就可以完成全部页面的爬取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import requests
from bs4 import BeautifulSoup
import time


def crawl_one_page(page_num):
resp = requests.get("http://nladuo.cn/scce_site/{page}.html".
format(page=page_num))
soup = BeautifulSoup(resp.content)
items = soup.find_all("div", {"class": "every_list"})

for item in items:
title_div = item.find("div", {"class": "list_title"})
title = title_div.a.get_text()
url = title_div.a["href"]
date = item.find("div", {"class": "list_time"}).get_text()
print(date, title, url)

if __name__ == '__main__':
t0 = time.time()
for i in range(1, 11):
print("crawling page %d ......." % i)
crawl_one_page(i)
print("used:", (time.time() - t0))

CPU密集型和IO密集型业务

通过上面的代码,我们完成了一个顺序结构的爬虫。下面我们来讨论如何爬取的速度瓶颈在哪里,从而提升爬取速率。

这里介绍一下CPU密集型业务I/O密集型业务

  • CPU密集型业务(CPU-bound):也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。
  • I/O密集型业务(I/O-bound):指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

(上述解释来自http://blog.csdn.net/youanyyou/article/details/78990156

网络爬虫主要有两个部分,一个是下载页面,一个是解析页面。显然,下载是个长时间的I/O密集操作,而解析页面则是需要调用算法来查找页面结构,是个CPU操作。

对于爬虫来说,耗时主要在下载一个网页中,根据网络的连通性,下载一个网页可能要几百毫秒甚至几秒,而解析一个页面可能只需要几十毫秒。所以爬虫其实是属于I/O密集型业务,其瓶颈主要在网络上面。

所以,提升爬虫的爬取速度,不是把CPU都跑满。而是要多开几个下载器,同时进行下载,把网络I/O跑满。

在Python中,使用多线程和多进程都可以实现并发下载。然而在python多线程无法跑多核(参见:GIL),而多进程可以。

这里,我们主要说一下python中多进程的使用。

多进程

python中调用多进程使用multiprocessing这个包就好了。下面创建了两个进程,每隔一秒打印一下进程ID。(这里的time.sleep可以理解为耗时的I/O操作。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import multiprocessing
import time
import os


def process(process_id):
while True:
time.sleep(1)
print('Task %d, pid: %d, doing something' % (process_id, os.getpid()))

if __name__ == "__main__":
# 进程1
p = multiprocessing.Process(target=process, args=(1,))
p.start()

# 进程2
p2 = multiprocessing.Process(target=process, args=(2,))
p2.start()

可以看到基本上是同时打印两句话。而在没用多进程前,我们的代码会像下面的代码的样子。

1
2
3
4
5
while True:
time.sleep(1)
print 'Task 1, doing something'
time.sleep(1)
print 'Task 2, doing something'

此时,我们创建两个进程,一个进程爬取1-5页,一个进程爬取6-10页。再来试试,看看速度有没有提升一倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import multiprocessing
import requests
from bs4 import BeautifulSoup
import time


def crawl_one_page(page_num):
resp = requests.get("http://nladuo.cn/scce_site/{page}.html".
format(page=page_num))
soup = BeautifulSoup(resp.content)
items = soup.find_all("div", {"class": "every_list"})

for item in items:
title_div = item.find("div", {"class": "list_title"})
title = title_div.a.get_text()
url = title_div.a["href"]
date = item.find("div", {"class": "list_time"}).get_text()
print(date, title, url)


def process(start, end):
for i in range(start, end):
print("crawling page %d ......." % i)
crawl_one_page(i)


if __name__ == '__main__':
t0 = time.time()
p = multiprocessing.Process(target=process, args=(1, 6)) # 任务1, 爬取1-5页
p.start()

p2 = multiprocessing.Process(target=process, args=(6, 11)) # 任务2, 爬取6-10页
p2.start()

p.join()
p2.join()

print("used:", (time.time() - t0))

进程池

像上面的方式,我们创建了两个进程,分别处理两个任务。然而有的时候,并不是那么容易的把一个任务分成两个任务。考虑一下把一个任务想象为爬取并解析一个网页,当我们有两个或者多个进程而任务有成千上万个的时候,代码应该怎么写呢?

这时候,我们需要维护几个进程,然后给每个进程分配一个网页,如何分配,需要我们自己定义。在所有的进程都在运行时,要保证有进程结束时,再加入新的进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import multiprocessing
import requests
from bs4 import BeautifulSoup
import time


def crawl_one_page(page_num):
resp = requests.get("http://nladuo.cn/scce_site/{page}.html".
format(page=page_num))
soup = BeautifulSoup(resp.content, "html.parser")
items = soup.find_all("div", {"class": "every_list"})

for item in items:
title_div = item.find("div", {"class": "list_title"})
title = title_div.a.get_text()
url = title_div.a["href"]
date = item.find("div", {"class": "list_time"}).get_text()
print(date, title, url)


if __name__ == '__main__':
t0 = time.time()

p = None # 进程1
p2 = None # 进程2

for i in range(1, 11):
if i % 2 == 1: # 把偶数任务分配给进程1
p = multiprocessing.Process(target=crawl_one_page, args=(i,))
p.start()
else: # 把奇数任务分配给进程2
p2 = multiprocessing.Process(target=crawl_one_page, args=(i,))
p2.start()

if i % 2 == 0: # 保证只有两个进程, 等待两个进程完成
p.join()
p2.join()

print("used:", (time.time() - t0))

上面的代码实现了一个简单的两进程的任务分配和管理,但其实也存在着一些问题:比如进程2先结束,此时就只有一个进程在运行,但程序还阻塞住,无法产生新的进程。这里只是简单的做个例子,旨在说明进程管理的复杂性。

下面我们说一说进程池,其实就是为了解决这个问题而设计的。

既然叫做进程池,那就是有个池子,里面有一堆公用的进程;当有任务来了,拿一个进程出来;当任务完成了,把进程还回池子里,给别的任务用;当池子里面没有可用进程的时候,那就要等待,等别人把进程归还了再拿去用。

下面我们来看一下代码,让每个进程每秒打印一下pid,一共打印两遍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Pool
import time
import os


def do_something(num):
for i in range(2):
time.sleep(1)
print("doing %d, pid: %d" % (num, os.getpid()))

if __name__ == '__main__':
p = Pool(3)
for page in range(1, 11): # 10个任务
p.apply_async(do_something, args=(page,))

p.close() # 关闭进程池, 不再接受任务
p.join() # 等待子进程结束

运行代码后可以看到,我们可以看到这里是三个三个的打印的,我们成功完成了三并发。同时,进程池一共产生了三个进程:59650、59651、59652,说明后面的所有任务都是使用这三个进程完成的。

下面,修改爬虫代码,用进程池实现并发爬取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import requests
from bs4 import BeautifulSoup
from multiprocessing import Pool
import time


def crawl_one_page(page_num):
resp = requests.get("http://nladuo.cn/scce_site/{page}.html".
format(page=page_num))
soup = BeautifulSoup(resp.content, "html.parser")
items = soup.find_all("div", {"class": "every_list"})

for item in items:
title_div = item.find("div", {"class": "list_title"})
title = title_div.a.get_text()
url = title_div.a["href"]
date = item.find("div", {"class": "list_time"}).get_text()
print(date, title, url)

if __name__ == '__main__':
t0 = time.time()
p = Pool(5)
for page in range(1, 11): # 1-10页
p.apply_async(crawl_one_page, args=(page,))

# 关闭进程池, 等待子进程结束
p.close()
p.join()

print("used:", (time.time() - t0))

到这里,多进程的讲解就结束了。

Prev
2018-12-13 10:31:30
Next