forked from jschnurr/scrapyscript
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscrapy_script.py
112 lines (88 loc) · 3.25 KB
/
scrapy_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
'''
Run scrapy spiders from a script.
Blocks and runs all requests in parallel. Accumulated items from all
spiders are returned as a list.
'''
import collections
import inspect
from billiard import Process # fork of multiprocessing that works with celery
from billiard.queues import Queue
from pydispatch import dispatcher
from scrapy import signals
from scrapy.crawler import CrawlerProcess
from scrapy.settings import Settings
from scrapy.spiders import Spider
class ScrapyScriptException(Exception):
pass
class Job(object):
'''A job is a single request to call a specific spider. *args and **kwargs
will be passed to the spider constructor.
'''
def __init__(self, spider, *args, **kwargs):
'''Parms:
spider (spidercls): the spider to be run for this job.
'''
self.spider = spider
self.args = args
self.kwargs = kwargs
class Processor(Process):
''' Start a twisted reactor and run the provided scrapy spiders.
Blocks until all have finished.
'''
def __init__(self, settings=None, item_scraped=True):
'''
Parms:
settings (scrapy.settings.Settings) - settings to apply. Defaults
to Scrapy default settings.
'''
kwargs = {'ctx': __import__('billiard.synchronize')}
self.results = Queue(**kwargs)
self.counts = Queue(**kwargs)
self.items = {}
self.items_count = {}
self.settings = settings or Settings()
self.item_scraped = item_scraped
dispatcher.connect(self._item_passed, signals.item_passed)
def _item_passed(self, item, response, spider):
if spider.name not in self.items.keys():
self.items[spider.name] = []
self.items_count[spider.name] = 0
if self.item_scraped is True:
self.items[spider.name].append(dict(item))
self.items_count[spider.name] += 1
def _crawl(self, requests):
'''
Parameters:
requests (Request) - One or more Jobs. All will
be loaded into a single invocation of the reactor.
'''
self.crawler = CrawlerProcess(self.settings)
# crawl can be called multiple times to queue several requests
for req in requests:
self.crawler.crawl(req.spider, *req.args, **req.kwargs)
self.crawler.start()
self.crawler.stop()
self.results.put(self.items)
self.counts.put(self.items_count)
def run(self, jobs):
'''Start the Scrapy engine, and execute all jobs. Return consolidated results
in a single list.
Parms:
jobs ([Job]) - one or more Job objects to be processed.
Returns:
List of objects yielded by the spiders after all jobs have run.
'''
if not isinstance(jobs, collections.Iterable):
jobs = [jobs]
self.validate(jobs)
p = Process(target=self._crawl, args=[jobs])
p.start()
p.join()
p.terminate()
def data(self):
return self.results.get()
def count(self):
return self.counts.get()
def validate(self, jobs):
if not all([isinstance(x, Job) for x in jobs]):
raise ScrapyScriptException('scrapy-script requires Job objects.')