Skip to content

Commit cc70f85

Browse files
authored
Port SitemapExtractor from CC-MRJob to CC-PySpark (#54)
- implement cc-pyspark equivalent of [cc-mrjob/sitemaps_from_robotstxt.py](https://github.com/commoncrawl/cc-mrjob/blob/fce4855958e1fd665cee37b975ff13030927b083/sitemaps_from_robotstxt.py) - two job definitions using warcio resp. fastwarc - implement unit tests to test sitemap URL extraction - run tests in Github workflow - update README - extend description about running jobs - including using the [pyspark Python module](https://pypi.org/project/pyspark/)
1 parent 8a01cf4 commit cc70f85

File tree

8 files changed

+811
-13
lines changed

8 files changed

+811
-13
lines changed

.github/workflows/python_test.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
name: Python Unit Tests
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
8+
workflow_dispatch:
9+
10+
jobs:
11+
test:
12+
runs-on: ubuntu-latest
13+
strategy:
14+
matrix:
15+
python-version: ['3.10', '3.11', '3.12', '3.13']
16+
fail-fast: false
17+
18+
steps:
19+
- uses: actions/checkout@v4
20+
21+
- name: Set up Python ${{ matrix.python-version }}
22+
uses: actions/setup-python@v5
23+
with:
24+
python-version: ${{ matrix.python-version }}
25+
26+
- name: Install dependencies
27+
run: |
28+
pip install -r requirements.txt
29+
pip install pyspark==3.5.7
30+
31+
- name: Run tests
32+
run: |
33+
python -m pytest . -v
34+
env:
35+
# PySpark needs this to find the test files
36+
PYTHONPATH: test

README.md

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,48 @@ Extending the [CCSparkJob](./sparkcc.py) isn't difficult and for many use cases
2828

2929
## Setup
3030

31-
To develop and test locally, you will need to install
32-
* Spark, see the [detailed instructions](https://spark.apache.org/docs/latest/), and
33-
* all required Python modules by running
31+
To develop and test locally, you'll need **Python>=3.9** and **Spark**.
32+
33+
### JRE
34+
35+
Spark requires a 64-bit Java JRE (v8, 11, or 17 for Spark 3.5.7). Install this first. If you have an Apple Silicon device, Azul Zulu JRE is recommended for native architecture support. Ensure that either `java` is on your `$PATH` or the `$JAVA_HOME` env var points to your JRE.
36+
37+
### Python dependencies
38+
39+
Assuming you have Python already setup and a venv activated, install the `cc-pyspark` dependencies:
40+
3441
```
3542
pip install -r requirements.txt
3643
```
37-
* (optionally, and only if you want to query the columnar index) [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3
3844

45+
#### If you want to query the columnar index:
46+
In addition to the above, [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3.
47+
48+
### Spark
49+
50+
There are two ways to obtain Spark:
51+
* manual installation / preinstallation
52+
* as a pip package with `pip install`
53+
54+
#### For simple development or to get started quickly, the `pip install` route is recommended:
55+
56+
```bash
57+
pip install pyspark==3.5.7
58+
```
59+
60+
This will install v3.5.7 of [the PySpark python package](https://spark.apache.org/docs/latest/api/python/getting_started/index.html), which includes a local/client-only version of Spark and also adds `spark-submit` and `pyspark` to your `$PATH`.
61+
62+
> If you need to interact with a remote Spark cluster, use a version of PySpark that matches the cluster version.
63+
64+
#### If Spark is already installed or if you want full tooling to configure a local Spark cluster:
65+
66+
Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`.
67+
68+
> Note: The PySpark package is required if you want to run the tests in `test/`.
3969
4070
## Compatibility and Requirements
4171

42-
Tested with with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.
72+
Tested with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.
4373

4474

4575
## Get Sample Data
@@ -62,11 +92,10 @@ CC-PySpark reads the list of input files from a manifest file. Typically, these
6292

6393
### Running locally
6494

65-
First, point the environment variable `SPARK_HOME` to your Spark installation.
66-
Then submit a job via
95+
Spark jobs can be started using `spark-submit` (see [Setup](#setup) above if you have a manual installation of Spark):
6796

6897
```
69-
$SPARK_HOME/bin/spark-submit ./server_count.py \
98+
spark-submit ./server_count.py \
7099
--num_output_partitions 1 --log_level WARN \
71100
./input/test_warc.txt servernames
72101
```
@@ -76,7 +105,7 @@ This will count web server names sent in HTTP response headers for the sample WA
76105
The output table can be accessed via SparkSQL, e.g.,
77106

78107
```
79-
$SPARK_HOME/bin/pyspark
108+
$ pyspark
80109
>>> df = sqlContext.read.parquet("spark-warehouse/servernames")
81110
>>> for row in df.sort(df.val.desc()).take(10): print(row)
82111
...
@@ -92,12 +121,29 @@ Row(key=u'Apache/2.2.15 (CentOS)', val=827)
92121
Row(key=u'Apache-Coyote/1.1', val=790)
93122
```
94123

95-
But it's also possible to configure a different output format, for example CSV or JSON, see the command-line options.
124+
But it's also possible to configure a different output format, for example CSV or JSON; pass `--help` on the command line for more details.
96125

97126
See also
98127
* [running the Spark shell and submitting Spark jobs](https://spark.apache.org/docs/latest/#running-the-examples-and-shell)
99128
* [PySpark SQL API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)
100129

130+
#### Debugging in an IDE
131+
132+
If the `.py` file for the job you want to debug is runnable (i.e. if it has a `if __name__ == "__main__":` line), you can bypass `spark-submit` and run it directly as a Python script:
133+
134+
```bash
135+
python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames`
136+
````
137+
138+
Spark will complain if the output directory exists - you may want to add a preprocessing step that deletes the appropriate subdirectory under `spark-warehouse` before each run, eg `rm -rf wpark-warehouse/servernames`.
139+
140+
> If you have manually installed Spark you'll need to ensure the pyspark package is on your PYTHONPATH:
141+
> ```bash
142+
> PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames
143+
> ```
144+
145+
Note that the `run_job` code is still invoked by the Spark Java binary behind the scenes, which normally prevents a debugger from attaching. To debug the `run_job` internals, it's recommended to set up a unit test and debug that; see `test/test_sitemaps_from_robotstxt` for examples of single and batched job tests.
146+
101147
102148
### Running in Spark cluster over large amounts of data
103149
@@ -116,7 +162,7 @@ As the Common Crawl dataset lives in the Amazon Public Datasets program, you can
116162
117163
All examples show the available command-line options if called with the parameter `--help` or `-h`, e.g.
118164
```
119-
$SPARK_HOME/bin/spark-submit ./server_count.py --help
165+
spark-submit ./server_count.py --help
120166
```
121167
122168
#### Overwriting Spark configuration properties
@@ -126,7 +172,7 @@ There are many [Spark configuration properties](https://spark.apache.org/docs/la
126172
It's possible to overwrite Spark properties when [submitting the job](https://spark.apache.org/docs/latest/submitting-applications.html):
127173
128174
```
129-
$SPARK_HOME/bin/spark-submit \
175+
spark-submit \
130176
--conf spark.sql.warehouse.dir=myWareHouseDir \
131177
... (other Spark options, flags, config properties) \
132178
./server_count.py \
@@ -170,7 +216,7 @@ Please also note that:
170216
171217
Below an example call to count words in 10 WARC records host under the `.is` top-level domain using the `--packages` option:
172218
```
173-
$SPARK_HOME/bin/spark-submit \
219+
spark-submit \
174220
--packages org.apache.hadoop:hadoop-aws:3.3.2 \
175221
./cc_index_word_count.py \
176222
--input_base_url s3://commoncrawl/ \
@@ -210,6 +256,21 @@ Some differences between the warcio and FastWARC APIs are hidden from the user i
210256
However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes).
211257
212258
259+
## Running the Tests
260+
261+
To run the tests in `test/` you will need to add `.` and `test` to the PYTHONPATH:
262+
263+
```bash
264+
PYTHONPATH=$PYTHONPATH:.:test pytest -v test
265+
```
266+
267+
or if you have a manual installation of Spark:
268+
269+
```bash
270+
PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python:.:test pytest -v test
271+
```
272+
273+
213274
## Credits
214275
215276
Examples are originally ported from Stephen Merity's [cc-mrjob](https://github.com/commoncrawl/cc-mrjob/) with the following changes and upgrades:

get-data.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,9 @@ for data_type in warc wat wet; do
4545

4646
done
4747

48+
echo "Downloading sample robots.txt file..."
49+
ccfile='crawl-data/CC-MAIN-2017-30/segments/1500549423183.57/robotstxt/CC-MAIN-20170720121902-20170720141902-00000.warc.gz'
50+
mkdir -p `dirname "$ccfile"`
51+
wget --no-clobber https://data.commoncrawl.org/$ccfile -O $ccfile
52+
53+

requirements.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,7 @@ lxml
2727
#Resiliparse
2828
# (tested with)
2929
#Resiliparse==0.15.2
30+
31+
# testing
32+
pytest
33+
pytest-mock

sitemaps_from_robotstxt.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import re
2+
from urllib.parse import urlparse, urljoin
3+
4+
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
5+
from warcio.recordloader import ArcWarcRecord
6+
7+
from sparkcc import CCSparkJob
8+
9+
class SitemapExtractorJob(CCSparkJob):
10+
"""Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files."""
11+
12+
name = "SitemapExtractor"
13+
14+
output_schema = StructType([
15+
StructField('sitemap_url', StringType(), True),
16+
StructField('hosts', ArrayType(elementType=StringType()), True)
17+
])
18+
19+
# rb: match on raw bytes so we can defer utf-8 decoding to the `sitemap:` line
20+
sitemap_pattern = re.compile(rb'^sitemap:\s*(\S+)', re.I)
21+
22+
robots_txt_processed = None
23+
sitemap_urls_found = None
24+
sitemap_url_invalid_encoding = None
25+
robots_txt_announcing_sitemap = None
26+
robots_txt_with_more_than_50_sitemaps = None
27+
28+
29+
def log_accumulators(self, session):
30+
super(SitemapExtractorJob, self).log_accumulators(session)
31+
32+
self.log_accumulator(session, self.robots_txt_processed,
33+
'robots.txt successfully parsed = {}')
34+
self.log_accumulator(session, self.sitemap_urls_found,
35+
'sitemap urls found = {}')
36+
self.log_accumulator(session, self.sitemap_url_invalid_encoding,
37+
'sitemap urls with invalid utf-8 encoding = {}')
38+
self.log_accumulator(session, self.robots_txt_announcing_sitemap,
39+
'robots.txt announcing at least 1 sitemap = {}')
40+
self.log_accumulator(session, self.robots_txt_with_more_than_50_sitemaps,
41+
'robots.txt with more than 50 sitemaps = {}')
42+
43+
44+
def init_accumulators(self, session):
45+
super(SitemapExtractorJob, self).init_accumulators(session)
46+
47+
sc = session.sparkContext
48+
self.robots_txt_processed = sc.accumulator(0)
49+
self.sitemap_urls_found = sc.accumulator(0)
50+
self.sitemap_url_invalid_encoding = sc.accumulator(0)
51+
self.robots_txt_announcing_sitemap = sc.accumulator(0)
52+
self.robots_txt_with_more_than_50_sitemaps = sc.accumulator(0)
53+
54+
55+
def process_record(self, record: ArcWarcRecord):
56+
""" emit: sitemap_url => [host] """
57+
if not self.is_response_record(record):
58+
# we're only interested in the HTTP responses
59+
return
60+
61+
self.robots_txt_processed.add(1)
62+
# robots_txt url/host are lazily computed when we encounter the first valid sitemap URL
63+
robots_txt_url = None
64+
robots_txt_host = None
65+
n_sitemaps = 0
66+
67+
data = self.get_payload_stream(record).read()
68+
for raw_line in data.splitlines():
69+
raw_line = raw_line.strip()
70+
71+
match = SitemapExtractorJob.sitemap_pattern.match(raw_line)
72+
if match:
73+
sitemap_url = match.group(1).strip()
74+
self.sitemap_urls_found.add(1)
75+
try:
76+
sitemap_url = sitemap_url.decode("utf-8", "strict")
77+
except UnicodeDecodeError as e:
78+
self.get_logger().warn(f'Invalid encoding of sitemap URL {sitemap_url}: {repr(e)}')
79+
self.sitemap_url_invalid_encoding.add(1)
80+
continue
81+
82+
if robots_txt_url is None:
83+
# first sitemap found: set base URL and get host from URL
84+
robots_txt_url = self.get_warc_header(record, 'WARC-Target-URI')
85+
try:
86+
robots_txt_host = urlparse(robots_txt_url).netloc.lower().lstrip('.')
87+
except Exception as e:
88+
self.get_logger().warn(f'Invalid robots.txt URL: {robots_txt_url}: {repr(e)}')
89+
# skip this entire robots.txt record
90+
return
91+
92+
if not (sitemap_url.startswith('http:') or sitemap_url.startswith('https:')):
93+
# sitemap_url is relative; pass straight to urljoin which knows how to handle it correctly
94+
try:
95+
sitemap_url = urljoin(robots_txt_url, sitemap_url)
96+
except Exception as e:
97+
self.get_logger().warn(f'Error joining sitemap URL {sitemap_url} with base {robots_txt_url}: {repr(e)}')
98+
continue
99+
100+
yield sitemap_url, [robots_txt_host]
101+
n_sitemaps += 1
102+
103+
if n_sitemaps > 0:
104+
self.robots_txt_announcing_sitemap.add(1)
105+
if n_sitemaps > 50:
106+
self.robots_txt_with_more_than_50_sitemaps.add(1)
107+
108+
109+
110+
if __name__ == '__main__':
111+
job = SitemapExtractorJob()
112+
job.run()
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from fastwarc.warc import WarcRecordType
2+
3+
from sparkcc_fastwarc import CCFastWarcSparkJob
4+
from sitemaps_from_robotstxt import SitemapExtractorJob
5+
6+
7+
class SitemapExtractorFastWarcJob(SitemapExtractorJob, CCFastWarcSparkJob):
8+
"""Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files
9+
using FastWARC to parse WARC files."""
10+
11+
name = "SitemapExtractorFastWarc"
12+
13+
# process only WARC response and metadata (including WAT) records
14+
fastwarc_record_filter = WarcRecordType.response
15+
16+
# process_record is implemented by SitemapExtractorJob
17+
18+
if __name__ == '__main__':
19+
job = SitemapExtractorFastWarcJob()
20+
job.run()

0 commit comments

Comments
 (0)