(五)scrapy-redis分布式爬虫项目实战

scrapy-redis分布式组件

由多台机器协同完成一个任务,从而缩短任务的执行时间
优点:

  • 提升了项目的整体速度
  • 单个节点不稳定不会影响整个任务执行

Scrapy 和 scrapy-redis的区别

Scrapy 是一个通用的爬虫框架,但是不支持分布式,Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(仅有组件)。

pip install scrapy-redis

Scrapy-redis提供了下面四种组件(components):(四种组件意味着这四个模块都要做相应的修改)

  • Scheduler
  • Duplication Filter
  • Item Pipeline
  • Base Spider

scrapy-redis架构


如上图所⽰示,scrapy-redis在scrapy的架构上增加了redis,基于redis的特性拓展了如下组件:

Scheduler:

Scrapy改造了python本来的collection.deque(双向队列)形成了自己的Scrapy queue(https://github.com/scrapy/queuelib/blob/master/queuelib/queue.py)),但是Scrapy多个spider不能共享待爬取队列Scrapy queue, 即Scrapy本身不支持爬虫分布式,scrapy-redis 的解决是把这个Scrapy queue换成redis数据库(也是指redis队列),从同一个redis-server存放要爬取的request,便能让多个spider去同一个数据库里读取。

Scrapy中跟“待爬队列”直接相关的就是调度器Scheduler,它负责对新的request进行入列操作(加入Scrapy queue),取出下一个要爬取的request(从Scrapy queue中取出)等操作。它把待爬队列按照优先级建立了一个字典结构,比如:

 {
        优先级0 : 队列0
        优先级1 : 队列1
        优先级2 : 队列2
    }

然后根据request中的优先级,来决定该入哪个队列,出列时则按优先级较小的优先出列。为了管理这个比较高级的队列字典,Scheduler需要提供一系列的方法。但是原来的Scheduler已经无法使用,所以使用Scrapy-redis的scheduler组件。

Duplication Filter

Scrapy中用集合实现这个request去重功能,Scrapy中把已经发送的request指纹放入到一个集合中,把下一个request的指纹拿到集合中比对,如果该指纹存在于集合中,说明这个request发送过了,如果没有则继续操作。这个核心的判重功能是这样实现的:

def request_seen(self, request):
        # 把请求转化为指纹  
        fp = self.request_fingerprint(request)
        # 这就是判重的核心操作  ,self.fingerprints就是指纹集合
        if fp in self.fingerprints:
            return True  #直接返回
        self.fingerprints.add(fp) #如果不在,就添加进去指纹集合
        if self.file:
            self.file.write(fp + os.linesep)

在scrapy-redis中去重是由Duplication Filter组件来实现的,它通过redis的set 不重复的特性,巧妙的实现了Duplication Filter去重。scrapy-redis调度器从引擎接受request,将request的指纹存⼊redis的set检查是否重复,并将不重复的request push写⼊redis的 request queue。

引擎请求request(Spider发出的)时,调度器从redis的request queue队列⾥里根据优先级pop 出⼀个request 返回给引擎,引擎将此request发给spider处理。

Item Pipeline

引擎将(Spider返回的)爬取到的Item给Item Pipeline,scrapy-redis 的Item Pipeline将爬取到的 Item 存⼊redis的 items queue。

修改过Item Pipeline可以很方便的根据 key 从 items queue 提取item,从⽽实现 items processes集群。

Base Spider

不在使用scrapy原有的Spider类,重写的RedisSpider继承了Spider和RedisMixin这两个类,RedisMixin是用来从redis读取url的类。

当我们生成一个Spider继承RedisSpider时,调用setup_redis函数,这个函数会去连接redis数据库,然后会设置signals(信号):

  • 一个是当spider空闲时候的signal,会调用spider_idle函数,这个函数调用schedule_next_request函数,保证spider是一直活着的状态,并且抛出DontCloseSpider异常。

  • 一个是当抓到一个item时的signal,会调用item_scraped函数,这个函数会调用schedule_next_request函数,获取下一个request。

Scrapy-Redis分布式策略:

假设有四台电脑:Windows 10、Mac OS X、Ubuntu 16.04、CentOS 7.2,任意一台电脑都可以作为 Master端 或 Slaver端,比如:

  • Master端(核心服务器) :使用 Windows 10,搭建一个Redis数据库,不负责爬取,只负责url指纹判重、Request的分配,以及数据的存储

  • Slaver端(爬虫程序执行端) :使用 Mac OS X 、Ubuntu 16.04、CentOS 7.2,负责执行爬虫程序,运行过程中提交新的Request给Master

  • 首先Slaver端从Master端拿任务(Request、url)进行数据抓取,Slaver抓取数据的同时,产生新任务的Request便提交给 Master 处理;

  • Master端只有一个Redis数据库,负责将未处理的Request去重和任务分配,将处理后的Request加入待爬队列,并且存储爬取的数据。
    Scrapy-Redis默认使用的就是这种策略,我们实现起来很简单,因为任务调度等工作Scrapy-Redis都已经帮我们做好了,我们只需要继承RedisSpider、指定redis_key就行了。

缺点是,Scrapy-Redis调度的任务是Request对象,里面信息量比较大(不仅包含url,还有callback函数、headers等信息),可能导致的结果就是会降低爬虫速度、而且会占用Redis大量的存储空间,所以如果要保证效率,那么就需要一定硬件水平

官方案例

克隆案例到本地

 clone github scrapy-redis源码文件
git clone https://github.com/rolando/scrapy-redis.git
# 直接拿官方的项目范例,改名为自己的项目用(针对懒癌患者)
mv scrapy-redis/example-project ~/scrapyredis-project


我们clone到的 scrapy-redis 源码中有自带一个example-project项目,这个项目包含3个spider,分别是dmoz, myspider_redis,mycrawler_redis。

一、dmoz (class DmozSpider(CrawlSpider))

这个爬虫继承的是CrawlSpider,它是用来说明Redis的持续性,当我们第一次运行dmoz爬虫,然后Ctrl + C停掉之后,再运行dmoz爬虫,之前的爬取记录是保留在Redis里的。

分析起来,其实这就是一个 scrapy-redis 版 CrawlSpider 类,需要设置Rule规则,以及callback不能写parse()方法。
执行方式:

scrapy crawl dmoz

代码如下:


from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
class DmozSpider(CrawlSpider):
    """Follow categories and extract links."""
    name = 'dmoz'
    allowed_domains = ['dmoztools.net/']
    start_urls = ['http://dmoztools.net/']
    rules = [
        Rule(LinkExtractor(
            restrict_css=('.top-cat', '.sub-cat', '.cat-item')
        ), callback='parse_directory', follow=True),
    ]
    def parse_directory(self, response):
        for div in response.css('.title-and-desc'):
            yield {
                'name': div.css('.site-title::text').extract_first(),
                'description': div.css('.site-descr::text').extract_first().strip(),
                'link': div.css('a::attr(href)').extract_first(),
            }

二、myspider_redis (class MySpider(RedisSpider))

这个爬虫继承了RedisSpider, 它能够支持分布式的抓取,采用的是basic spider,需要写parse函数。

其次就是不再有start_urls了,取而代之的是redis_key,scrapy-redis将key从Redis里pop出来,成为请求的url地址。

from scrapy_redis.spiders import RedisSpider
class MySpider(RedisSpider):
    """Spider that reads urls from redis queue (myspider:start_urls)."""
    name = 'myspider_redis'
    # 注意redis-key的格式:
    redis_key = 'myspider:start_urls'
    # 可选:等效于allowd_domains(),__init__方法按规定格式写,使用时只需要修改super()里的类名参数即可
    def __init__(self, *args, **kwargs):
        # Dynamically define the allowed domains list.
        domain = kwargs.pop('domain', '')
        self.allowed_domains = filter(None, domain.split(','))
        # 修改这里的类名为当前类名
        super(MySpider, self).__init__(*args, **kwargs)
    def parse(self, response):
        return {
            'name': response.css('title::text').extract_first(),
            'url': response.url,
        }

注意:
RedisSpider类 不需要写allowd_domains和start_urls:

  • scrapy-redis将从在构造方法init()里动态定义爬虫爬取域范围,也可以选择直接写allowd_domains。

  • 必须指定redis_key,即启动爬虫的命令,参考格式:redis_key = ‘myspider:start_urls’

  • 根据指定的格式,start_urls将在 Master端的 redis-cli 里 lpush 到 Redis数据库里,RedisSpider 将在数据库里获取start_urls。

执行方式:

  1. 通过runspider方法执行爬虫的py文件(也可以分次执行多条),爬虫(们)将处于等待准备状态:
scrapy runspider myspider_redis.py
  1. 在Master端的redis-cli输入push指令,参考格式:
$redis > lpush myspider:start_urls http://dmoztools.net/
  1. Slaver端爬虫获取到请求,开始爬取。

三、mycrawler_redis (class MyCrawler(RedisCrawlSpider))

这个RedisCrawlSpider类爬虫继承了RedisCrawlSpider,能够支持分布式的抓取。因为采用的是crawlSpider,所以需要遵守Rule规则,以及callback不能写parse()方法。

同样也不再有start_urls了,取而代之的是redis_key,scrapy-redis将key从Redis里pop出来,成为请求的url地址

from scrapy.spiders import Rule
from scrapy.linkextractors import LinkExtractor
from scrapy_redis.spiders import RedisCrawlSpider
class MyCrawler(RedisCrawlSpider):
    """Spider that reads urls from redis queue (myspider:start_urls)."""
    name = 'mycrawler_redis'
    redis_key = 'mycrawler:start_urls'
    rules = (
        # follow all links
        Rule(LinkExtractor(), callback='parse_page', follow=True),
    )
    # __init__方法必须按规定写,使用时只需要修改super()里的类名参数即可
    def __init__(self, *args, **kwargs):
        # Dynamically define the allowed domains list.
        domain = kwargs.pop('domain', '')
        self.allowed_domains = filter(None, domain.split(','))
        # 修改这里的类名为当前类名
        super(MyCrawler, self).__init__(*args, **kwargs)
    def parse_page(self, response):
        return {
            'name': response.css('title::text').extract_first(),
            'url': response.url,
        }

注意:
同样的,RedisCrawlSpider类不需要写allowd_domains和start_urls:

  • scrapy-redis将从在构造方法init()里动态定义爬虫爬取域范围,也可以选择直接写allowd_domains。

  • 必须指定redis_key,即启动爬虫的命令,参考格式:redis_key = ‘myspider:start_urls’

  • 根据指定的格式,start_urls将在 Master端的 redis-cli 里 lpush 到 Redis数据库里,RedisSpider 将在数据库里获取start_urls。
    执行方式:

  1. 通过runspider方法执行爬虫的py文件(也可以分次执行多条),爬虫(们)将处于等待准备状态:
scrapy runspider mycrawler_redis.py
  1. 在Master端的redis-cli输入push指令,参考格式
$redis > lpush mycrawler:start_urls http://www.dmoz.org/
  1. 爬虫获取url,开始执行。

总结:

  • 如果只是用到Redis的去重和保存功能,就选第一种;

  • 如果要写分布式,则根据情况,选择第二种、第三种;

  • 通常情况下,会选择用第三种方式编写深度聚焦爬虫。

项目实战:京东图书爬虫

需求:抓取京东图书信息 目标字段: 书名,大分类,大分类页面url,小分类,小分类页面url,封面图片链接,详情页面url,作者,出版社,出版时间,价格 url: https://book.jd.com/booksort.html

分布式爬虫构建的思路:

  • 先完成普通爬虫
  • 再修改为分布式爬虫

京东图书普通爬虫

新建项目

scrapy startproject JD

然后执行

scrapy genspider book jd.com

生成如下目录结构


修改JD/items.py文件

# -*- coding: utf-8 -*-
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html
import scrapy
class JdItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    # 书名,大分类,大分类页面url,小分类,小分类页面url,封面图片链接,详情页面url,作者,出版社,出版时间,价格
    name = scrapy.Field()
    big_category = scrapy.Field()
    big_category_url = scrapy.Field()
    small_category = scrapy.Field()
    small_category_url = scrapy.Field()
    cover_url = scrapy.Field()
    detail_url = scrapy.Field()
    author = scrapy.Field()
    publisher = scrapy.Field()
    pub_date = scrapy.Field()
    price = scrapy.Field()

编写 JD/spiders/book.py文件

# -*- coding: utf-8 -*-
import scrapy
import json
class BookSpider(scrapy.Spider):
    name = 'book'
    # 'p.3.cn' 为解析图书列表允许的列名
    allowed_domains = ['jd.com', 'p.3.cn']
    start_urls = ['https://book.jd.com/booksort.html']
    def parse(self, response):
        big_list = response.xpath('//*[@id="booksort"]/div[2]/dl/dt/a')
        print('大节点',len(big_list))
        for big in big_list:
            # 获取到大分类的节点链接、节点名称
            big_list_url = 'https:' + big.xpath('./@href').extract_first()
            big_category = big.xpath('./text()').extract_first()
            # 小分类的节点列表
            small_list = big.xpath('../following-sibling::dd[1]/em/a')
            # 遍历小分类的节点列表,获取到小分类名称、url
            for small in small_list:
                temp = {}
                temp['big_list_url'] = big_list_url
                temp['big_category'] = big_category
                temp['small_category'] = small.xpath('./text()').extract_first()
                temp['small_category_url'] = 'https:' + small.xpath('./@href').extract_first()
                # print(temp)
            # 构造请求,返回小分类的url
            yield scrapy.Request(
                temp['small_category_url'],
                callback=self.parse_book_list,
                meta={'meta1': temp}
             )
    # 解析图片列表信息
    def parse_book_list(self,response):
        # 接受parse方法返回的meta数据
        temp = response.meta['meta1']
        # 获取图片列表节点
        book_list = response.xpath('//*[@id="plist"]/ul/li/div')
        # 遍历图书列表
        for book in book_list:
            # 实例化item
            item = JdItem()
            # 书名信息、分类信息
            item['name'] = book.xpath('./div[3]/a/em/text()').extract_first().strip()
            item['big_category'] = temp['big_category']
            item['big_category_url'] = temp['big_list_url']
            item['small_category'] = temp['small_category']
            item['small_category_url'] = temp['small_category_url']
            # /div[1]/a/img/@src
            try:
                item['cover_url'] = 'https:' + book.xpath('./div[1]/a/img/@src').extract_first()
            except:
                item['cover_url'] = None
            try:
                item['detail_url'] = 'https:' + book.xpath('./div[3]/a/@href').extract_first()
            except:
                item['detail_url'] = None
            item['author'] = book.xpath('./div[@class="p-bookdetails"]/span[@class="p-bi-name"]/span[@class="author_type_1"]/a/text()').extract_first()
            item['publisher'] = book.xpath('./div[@class="p-bookdetails"]/span[2]/a/text()').extract_first()
            item['pub_date'] = book.xpath('./div[@class="p-bookdetails"]/span[3]/text()').extract_first().strip()
            # 获取价格的url
            # https://p.3.cn/prices/mgets?skuIds=J_11757834%2CJ_10367073%2CJ_11711801%2CJ_12090377%2CJ_10199768%2CJ_11711801%2CJ_12018031%2CJ_10019917%2CJ_11711801%2CJ_10162899%2CJ_11081695%2CJ_12114139%2CJ_12010088%2CJ_12161302%2CJ_11779454%2CJ_11939717%2CJ_12026957%2CJ_12184621%2CJ_12115244%2CJ_11930113%2CJ_10937943%2CJ_12192773%2CJ_12073030%2CJ_12098764%2CJ_11138599%2CJ_11165561%2CJ_11920855%2CJ_11682924%2CJ_11682923%2CJ_11892139&pduid=1523432585886562677791
            skuid = book.xpath('./@data-sku').extract_first()
            # print(skuid)
            pduid = '&pduid=1523432585886562677791'
            print(item)
            # 再次发送请求,获取价格信息
            if skuid is not None:
                url = 'https://p.3.cn/prices/mgets?skuIds=J_' + skuid + pduid
                yield scrapy.Request(
                    url,
                    callback=self.parse_price,
                    meta={'meta2':item}
                )
    # 解析价格
    def parse_price(self,response):
        item = response.meta['meta2']
        data = json.loads(response.body)
        print(data)
        item['price'] = data[0]['op']
        # print (item)
        yield item

执行命令

scrapy crawl book --nolog

可以看到我们成功的爬取了如下的信息

修改成分布式

修改book.py文件

# -*- coding: utf-8 -*-
import scrapy
# 导入item
from JD.items import JdItem
import json
# 改成分布式
# 1------导入类
from scrapy_redis.spiders import RedisSpider
# 2------修改继承
class BookSpider(RedisSpider):
# class BookSpider(scrapy.Spider):
    name = 'book'
    # 'p.3.cn' 为解析图书列表允许的列名
    allowed_domains = ['jd.com', 'p.3.cn']
    # start_urls = ['https://book.jd.com/booksort.html']
    # 3------定义redis_key
    redis_key = 'books'
    def parse(self, response):
        big_list = response.xpath('//*[@id="booksort"]/div[2]/dl/dt/a')
        print('大节点',len(big_list))
        for big in big_list:
            # 获取到大分类的节点链接、节点名称
            big_list_url = 'https:' + big.xpath('./@href').extract_first()
            big_category = big.xpath('./text()').extract_first()
            # 小分类的节点列表
            small_list = big.xpath('../following-sibling::dd[1]/em/a')
            # 遍历小分类的节点列表,获取到小分类名称、url
            for small in small_list:
                temp = {}
                temp['big_list_url'] = big_list_url
                temp['big_category'] = big_category
                temp['small_category'] = small.xpath('./text()').extract_first()
                temp['small_category_url'] = 'https:' + small.xpath('./@href').extract_first()
                # print(temp)
            # 构造请求,返回小分类的url
            yield scrapy.Request(
                temp['small_category_url'],
                callback=self.parse_book_list,
                meta={'meta1': temp}
             )
    # 解析图片列表信息
    def parse_book_list(self,response):
        # 接受parse方法返回的meta数据
        temp = response.meta['meta1']
        # 获取图片列表节点
        book_list = response.xpath('//*[@id="plist"]/ul/li/div')
        # 遍历图书列表
        for book in book_list:
            # 实例化item
            item = JdItem()
            # 书名信息、分类信息
            item['name'] = book.xpath('./div[3]/a/em/text()').extract_first().strip()
            item['big_category'] = temp['big_category']
            item['big_category_url'] = temp['big_list_url']
            item['small_category'] = temp['small_category']
            item['small_category_url'] = temp['small_category_url']
            # /div[1]/a/img/@src
            try:
                item['cover_url'] = 'https:' + book.xpath('./div[1]/a/img/@src').extract_first()
            except:
                item['cover_url'] = None
            try:
                item['detail_url'] = 'https:' + book.xpath('./div[3]/a/@href').extract_first()
            except:
                item['detail_url'] = None
            item['author'] = book.xpath('./div[@class="p-bookdetails"]/span[@class="p-bi-name"]/span[@class="author_type_1"]/a/text()').extract_first()
            item['publisher'] = book.xpath('./div[@class="p-bookdetails"]/span[2]/a/text()').extract_first()
            item['pub_date'] = book.xpath('./div[@class="p-bookdetails"]/span[3]/text()').extract_first().strip()
            # 获取价格的url
            # https://p.3.cn/prices/mgets?skuIds=J_11757834%2CJ_10367073%2CJ_11711801%2CJ_12090377%2CJ_10199768%2CJ_11711801%2CJ_12018031%2CJ_10019917%2CJ_11711801%2CJ_10162899%2CJ_11081695%2CJ_12114139%2CJ_12010088%2CJ_12161302%2CJ_11779454%2CJ_11939717%2CJ_12026957%2CJ_12184621%2CJ_12115244%2CJ_11930113%2CJ_10937943%2CJ_12192773%2CJ_12073030%2CJ_12098764%2CJ_11138599%2CJ_11165561%2CJ_11920855%2CJ_11682924%2CJ_11682923%2CJ_11892139&pduid=1523432585886562677791
            skuid = book.xpath('./@data-sku').extract_first()
            # print(skuid)
            pduid = '&pduid=1523432585886562677791'
            print(item)
            # 再次发送请求,获取价格信息
            if skuid is not None:
                url = 'https://p.3.cn/prices/mgets?skuIds=J_' + skuid + pduid
                yield scrapy.Request(
                    url,
                    callback=self.parse_price,
                    meta={'meta2':item}
                )
    # 解析价格
    def parse_price(self,response):
        item = response.meta['meta2']
        data = json.loads(response.body)
        print(data)
        item['price'] = data[0]['op']
        # print (item)
        yield item

注释原来的JD/settings.py文件,更改如下

# -*- coding: utf-8 -*-
import scrapy
# 导入item
from JD.items import JdItem
import json
# 改成分布式
# 1------导入类
from scrapy_redis.spiders import RedisSpider
# 2------修改继承
class BookSpider(RedisSpider):
# class BookSpider(scrapy.Spider):
    name = 'book'
    # 'p.3.cn' 为解析图书列表允许的列名
    allowed_domains = ['jd.com', 'p.3.cn']
    # start_urls = ['https://book.jd.com/booksort.html']
    # 3------定义redis_key
    redis_key = 'books'
    def parse(self, response):
        big_list = response.xpath('//*[@id="booksort"]/div[2]/dl/dt/a')
        print('大节点',len(big_list))
        for big in big_list:
            # 获取到大分类的节点链接、节点名称
            big_list_url = 'https:' + big.xpath('./@href').extract_first()
            big_category = big.xpath('./text()').extract_first()
            # 小分类的节点列表
            small_list = big.xpath('../following-sibling::dd[1]/em/a')
            # 遍历小分类的节点列表,获取到小分类名称、url
            for small in small_list:
                temp = {}
                temp['big_list_url'] = big_list_url
                temp['big_category'] = big_category
                temp['small_category'] = small.xpath('./text()').extract_first()
                temp['small_category_url'] = 'https:' + small.xpath('./@href').extract_first()
                # print(temp)
            # 构造请求,返回小分类的url
            yield scrapy.Request(
                temp['small_category_url'],
                callback=self.parse_book_list,
                meta={'meta1': temp}
             )
    # 解析图片列表信息
    def parse_book_list(self,response):
        # 接受parse方法返回的meta数据
        temp = response.meta['meta1']
        # 获取图片列表节点
        book_list = response.xpath('//*[@id="plist"]/ul/li/div')
        # 遍历图书列表
        for book in book_list:
            # 实例化item
            item = JdItem()
            # 书名信息、分类信息
            item['name'] = book.xpath('./div[3]/a/em/text()').extract_first().strip()
            item['big_category'] = temp['big_category']
            item['big_category_url'] = temp['big_list_url']
            item['small_category'] = temp['small_category']
            item['small_category_url'] = temp['small_category_url']
            # /div[1]/a/img/@src
            try:
                item['cover_url'] = 'https:' + book.xpath('./div[1]/a/img/@src').extract_first()
            except:
                item['cover_url'] = None
            try:
                item['detail_url'] = 'https:' + book.xpath('./div[3]/a/@href').extract_first()
            except:
                item['detail_url'] = None
            item['author'] = book.xpath('./div[@class="p-bookdetails"]/span[@class="p-bi-name"]/span[@class="author_type_1"]/a/text()').extract_first()
            item['publisher'] = book.xpath('./div[@class="p-bookdetails"]/span[2]/a/text()').extract_first()
            item['pub_date'] = book.xpath('./div[@class="p-bookdetails"]/span[3]/text()').extract_first().strip()
            # 获取价格的url
            # https://p.3.cn/prices/mgets?skuIds=J_11757834%2CJ_10367073%2CJ_11711801%2CJ_12090377%2CJ_10199768%2CJ_11711801%2CJ_12018031%2CJ_10019917%2CJ_11711801%2CJ_10162899%2CJ_11081695%2CJ_12114139%2CJ_12010088%2CJ_12161302%2CJ_11779454%2CJ_11939717%2CJ_12026957%2CJ_12184621%2CJ_12115244%2CJ_11930113%2CJ_10937943%2CJ_12192773%2CJ_12073030%2CJ_12098764%2CJ_11138599%2CJ_11165561%2CJ_11920855%2CJ_11682924%2CJ_11682923%2CJ_11892139&pduid=1523432585886562677791
            skuid = book.xpath('./@data-sku').extract_first()
            # print(skuid)
            pduid = '&pduid=1523432585886562677791'
            print(item)
            # 再次发送请求,获取价格信息
            if skuid is not None:
                url = 'https://p.3.cn/prices/mgets?skuIds=J_' + skuid + pduid
                yield scrapy.Request(
                    url,
                    callback=self.parse_price,
                    meta={'meta2':item}
                )
    # 解析价格
    def parse_price(self,response):
        item = response.meta['meta2']
        data = json.loads(response.body)
        print(data)
        item['price'] = data[0]['op']
        # print (item)
        yield item

完成以上代买后我们开启两个终端:
执行命令:

 scrapy runspider book.py

在redis安装目录启动:

redis-cli.exe

我们可以看到分布式爬虫运行起来了


使用 Redis Desktop Manager查看数据

数据持久化 保存至MongoDB中

  • 什么是数据持久化 : 所谓数据持久化就是将redis中存储的item数据存储到其他数据库或介质中
  • 为什么要做数据持久化处理 1)redis是内存型数据库,容量有限 2)内存在断电时会丢失所有数据,不安全 3)数据的使用一般不使用redis

如何将数据持久化

  • 将redis数据库中的数据读出,存放到其他类型的数据库中
  • Python redis库 1.链接: redis.Redis(host,port,db) 2.读取: 以先进先出的形式读取数据 source,data = redis.blpop(keylist) 以先进后出的形式读取数据 source,data = redis.brpop(keylist)

将爬取的京东图书从redis中取出然后保存至MongoDB中

开启mongodb数据库


新建redis_mongo.py文件,执行如下代码

# 数据的持久化操作redis---->MongoDB
import redis
from pymongo import MongoClient
import json
# 实例化redis客户端
redis_client = redis.Redis(host='127.0.0.1',port=6379)
# 实例化MongoDB客户端
mongo_client = MongoClient(host='127.0.0.1',port=27017)
# 指定链接的MongDB数据库、集合
db = mongo_client['CRAWL']
col = db['crawl']
# 使用循环把redis中数据全部写入到MongoDB中
while True:
    # 从redis中取出数据
    key,data = redis_client.blpop(['book:items'])
    print(key)
    print(data)
    # 把数据写入到MongoDB中
    col.insert(json.loads(data.decode()))
# 关闭数据库
mongo_client.close()


使用robo mongo查看数据是否写入数据库中


https://www.jianshu.com/p/80a68cdc20d2

「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!
0 条回复 A 作者 M 管理员
    所有的伟大,都源于一个勇敢的开始!
欢迎您,新朋友,感谢参与互动!欢迎您 {{author}},您在本站有{{commentsCount}}条评论