2012-01-18 10 views
7

Definiuję eksportera elementów, który przesuwa elementy do kolejki komunikatów. Poniżej znajduje się kod.Niestandardowy eksporter Scrapy

from scrapy.contrib.exporter import JsonLinesItemExporter 
from scrapy.utils.serialize import ScrapyJSONEncoder 
from scrapy import log 

from scrapy.conf import settings 

from carrot.connection import BrokerConnection, Exchange 
from carrot.messaging import Publisher 

log.start() 


class QueueItemExporter(JsonLinesItemExporter): 

    def __init__(self, **kwargs): 

     log.msg("Initialising queue exporter", level=log.DEBUG) 

     self._configure(kwargs) 

     host_name = settings.get('BROKER_HOST', 'localhost') 
     port = settings.get('BROKER_PORT', 5672) 
     userid = settings.get('BROKER_USERID', "guest") 
     password = settings.get('BROKER_PASSWORD', "guest") 
     virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/") 

     self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs) 

     log.msg("Connecting to broker", level=log.DEBUG) 
     self.q_connection = BrokerConnection(hostname=host_name, port=port, 
         userid=userid, password=password, 
         virtual_host=virtual_host) 
     self.exchange = Exchange("scrapers", type="topic") 
     log.msg("Connected", level=log.DEBUG) 

    def start_exporting(self): 
     spider_name = "test" 
     log.msg("Initialising publisher", level=log.DEBUG) 
     self.publisher = Publisher(connection=self.q_connection, 
         exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name) 
     log.msg("done", level=log.DEBUG) 

    def finish_exporting(self): 
     self.publisher.close() 

    def export_item(self, item): 
     log.msg("In export item", level=log.DEBUG) 
     itemdict = dict(self._get_serialized_fields(item)) 
     self.publisher.send({"scraped_data": self.encoder.encode(itemdict)}) 
     log.msg("sent to queue - scrapy.spider.naukri", level=log.DEBUG) 

Mam kilka problemów. Elementy nie są przesyłane do kolejki. Ive dodaje następujące do moich ustawień:

FEED_EXPORTERS = { 
    "queue": 'scrapers.exporters.QueueItemExporter' 
} 

FEED_FORMAT = "queue" 

LOG_STDOUT = True 

kodu nie wzbudza żadnych błędów, a nie widzę żadnej z logu. Im na mój rozum koniec, jak to debugować.

Każda pomoc będzie mile widziana.

+0

Myślę, że napisanie rurociągu przedmiotu byłoby prostsze do tego celu i wiązałoby się z mniejszym zakresem kodu, który jest potencjalnym źródłem błędów. Tak więc chciałbym zmienić kod, aby działał jak potok, zamiast eksportera niestandardowych przedmiotów. Zobacz [item pip doc] (http://doc.scrapy.org/en/latest/topics/item-pipeline.html) –

+1

Już napisałem potok, ale moje myślenie było, ponieważ w ten sposób chcę, aby moje wyniki z Skrobaczka, eksporter byłby lepszym miejscem do umieszczenia go. – zsquare

+0

@zsquare, jakikolwiek sukces w tym problemie? Wiem, że to stary post, ale co zrobiłeś? – Medeiros

Odpowiedz

5

"Eksporterzy kanałów" to szybkie (i trochę brudne) skróty do wywoływania niektórych "standardowych" eksporterów towarów. Zamiast konfigurowania eksportera podawania z ustawieniami, twardy drut niestandardowa pozycja eksportera do niestandardowego rurociągu, jak wyjaśniono tutaj http://doc.scrapy.org/en/0.14/topics/exporters.html#using-item-exporters:

from scrapy.xlib.pydispatch import dispatcher 
from scrapy import signals 
from scrapy.contrib.exporter import XmlItemExporter 

class MyPipeline(object): 

    def __init__(self): 
     ... 
     dispatcher.connect(self.spider_opened, signals.spider_opened) 
     dispatcher.connect(self.spider_closed, signals.spider_closed) 
     ... 

    def spider_opened(self, spider): 
     self.exporter = QueueItemExporter() 
     self.exporter.start_exporting() 

    def spider_closed(self, spider): 
     self.exporter.finish_exporting() 

    def process_item(self, item, spider): 
     # YOUR STUFF HERE 
     ... 
     self.exporter.export_item(item) 
     return item 
+0

To wydaje się obiecujące. Ktoś to testował? – Medeiros

+3

Co jest w nich "brudne"? Jestem nowy w Scrapy i zadaję sobie to samo pytanie, co w przypadku @ zsquare OP, i muszę powiedzieć, że jego komentarz, "moje myślenie było, ponieważ w ten sposób chcę uzyskać moją produkcję ze skrobaka, eksporter byłby lepszym miejscem do umieszczenia to "odzwierciedla mój własny sposób myślenia. – feuGene

1

Tip: dobry przykład zacząć od jest Writing Items to MongoDb z oficjalnych dokumentów.

Zrobiłem podobną rzecz. Stworzyłem potok, który umieszcza każdy przedmiot w usłudze podobnej do S3 (używam tutaj Minio, ale masz pomysł). Tworzy nowe wiadro dla każdego pająka i umieszcza każdy przedmiot w obiekcie o losowej nazwie. Pełny kod źródłowy można znaleźć w my repo.

Począwszy od prostych cudzysłowów pająk z samouczka:

import scrapy 

class QuotesSpider(scrapy.Spider): 
    name = "quotes" 
    start_urls = ['http://quotes.toscrape.com/page/1/', 
        'http://quotes.toscrape.com/page/1/'] 

    def parse(self, response): 
     for quote in response.css('div.quote'): 
      yield { 
        'text':quote.css('span.text::text').extract_first(), 
        'author':quote.css('span small::text').extract_first(), 
        'tags':quote.css('div.tags a.tag::text').extract() 
        } 
     next_page = response.css('li.next a::attr(href)').extract_first() 
     if next_page is not None: 
      next_page = response.urljoin(next_page) 
      yield scrapy.Request(next_page, callback=self.parse) 

W settings.py:

ITEM_PIPELINES = { 
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300, 
} 

W scrapy_quotes/pipelines.py stworzyć rurociągu i eksportera produktu:

import uuid 
from StringIO import StringIO 
from scrapy.contrib.exporter import BaseItemExporter 
from scrapy.conf import settings 
from scrapy import signals 
from scrapy.xlib.pydispatch import dispatcher 
from scrapy import log 
from scrapy.utils.python import to_bytes 
from scrapy.utils.serialize import ScrapyJSONEncoder 

class S3ItemExporter(BaseItemExporter): 
    def __init__(self, bucket, **kwargs): 
     self._configure(kwargs) 
     self.bucket = bucket 
     kwargs.setdefault('ensure_ascii', not self.encoding) 
     self.encoder = ScrapyJSONEncoder(**kwargs) 

    def start_exporting(self): 
     self.client = connect() 
     create_bucket(self.client, self.bucket) 

    def finish_exporting(self): 
     log.msg("Done from S3 item exporter", level=log.DEBUG) 

    def export_item(self, item): 
     log.msg("S3 item exporter got item: %s" % item, level=log.DEBUG) 
     itemdict = dict(self._get_serialized_fields(item)) 
     data = self.encoder.encode(itemdict) 
     size = len(data) 
     object_data = StringIO(data) 
     name = str(uuid.uuid4()) 
     put_object(self.client, self.bucket, name, object_data, size) 


class ScrapyQuotesPipeline(object): 
    """Export scraped items, to different buckets, 
    one per spider""" 
    @classmethod 
    def from_crawler(cls, crawler): 
     pipeline = cls() 
     crawler.signals.connect(pipeline.spider_opened, signals.spider_opened) 
     crawler.signals.connect(pipeline.spider_closed, signals.spider_closed) 
     return pipeline 

    def spider_opened(self, spider): 
     self.exporter = S3ItemExporter(spider.name) 
     self.exporter.start_exporting() 

    def spider_closed(self, spider): 
     self.exporter.finish_exporting() 

    def process_item(self, item, spider): 
     self.exporter.export_item(item) 
     return item 

# S3 Related 
from minio import Minio 
import os 
from minio.error import ResponseError 
def connect(): 
    return Minio('192.168.1.111:9000', 
      access_key='0M6PYKBBAVQVQGVWVZKQ', 
      secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8', 
      secure=False) 

def create_bucket(client, name): 
    client.make_bucket(name) 

def put_object(client, bucket_name, object_name, object_data, size): 
    client.put_object(bucket_name, object_name, object_data, size) 
0

I hit ten sam problem, chociaż prawdopodobnie niektóre wersje są aktualizowane. Drobnym szczegółem, który rozwiązał to dla mnie, było ustawienie FEED_URI = "something" w settings.py. Bez tego wpis w FEED_EXPORTERS nie był w ogóle przestrzegany.