forked from pgh-public-meetings/city-scrapers-pitt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipelines.py
51 lines (42 loc) · 1.93 KB
/
pipelines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""A pipeline for diffing newly scraped meetings with
previously scraped meetings, using the file system
as a backing store
This is based on the DiffPipeline element defined in city_scraper_core:
https://github.com/City-Bureau/city-scrapers-core/blob/main/city_scrapers_core/pipelines/diff.py
The purpose of the DiffPipeline is to avoid loading duplicate meeting
entries. However, the city_scraper_core implementations only work
with s3 and azure storage. This implementation works with a local
folder on disk, for testing purposes.
"""
import json
from typing import List, Mapping
from city_scrapers_core.pipelines import DiffPipeline
from pytz import timezone
from scrapy.crawler import Crawler
import utils
class FileSystemDiffPipeline(DiffPipeline):
def __init__(self, crawler: Crawler, output_format: str):
"""Initialize FileSystemDiffPipeline
Params:
crawler: Current Crawler object
output_format: Currently only "ocd" is supported
"""
super().__init__(crawler, output_format)
feed_uri = crawler.settings.get("FEED_URI")
self.folder = crawler.settings.get("FEED_OUTPUT_DIRECTORY")
self.spider = crawler.spider
self.feed_prefix = crawler.settings.get(
"CITY_SCRAPERS_DIFF_FEED_PREFIX", "%Y/%m/%d"
)
self.index = utils.build_spider_index(self.folder)
def load_previous_results(self) -> List[Mapping]:
"""Walk the local directory, returning the latest result for each spider."""
tz = timezone(self.spider.timezone)
# Since the file structure is Year/Month/Day/Time/<spider>.json, sorting
# should be sufficient to find the most recent spider result
spider_outputs = sorted(self.index[self.spider.name])
if len(spider_outputs) > 0:
latest = spider_outputs[-1]
with open(latest) as f:
return [json.loads(line) for line in f.readlines()]
return []