Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/python_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Python Unit Tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
fail-fast: false

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pyspark==3.5.7

- name: Run tests
run: |
python -m pytest . -v
env:
# PySpark needs this to find the test files
PYTHONPATH: test
87 changes: 74 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,48 @@ Extending the [CCSparkJob](./sparkcc.py) isn't difficult and for many use cases

## Setup

To develop and test locally, you will need to install
* Spark, see the [detailed instructions](https://spark.apache.org/docs/latest/), and
* all required Python modules by running
To develop and test locally, you'll need **Python>=3.9** and **Spark**.

### JRE

Spark requires a 64-bit Java JRE (v8, 11, or 17 for Spark 3.5.7). Install this first. If you have an Apple Silicon device, Azul Zulu JRE is recommended for native architecture support. Ensure that either `java` is on your `$PATH` or the `$JAVA_HOME` env var points to your JRE.

### Python dependencies

Assuming you have Python already setup and a venv activated, install the `cc-pyspark` dependencies:

```
pip install -r requirements.txt
```
* (optionally, and only if you want to query the columnar index) [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3

#### If you want to query the columnar index:
In addition to the above, [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3.

### Spark

There are two ways to obtain Spark:
* manual installation / preinstallation
* as a pip package with `pip install`

#### For simple development or to get started quickly, the `pip install` route is recommended:

```bash
pip install pyspark==3.5.7
```

This will install v3.5.7 of [the PySpark python package](https://spark.apache.org/docs/latest/api/python/getting_started/index.html), which includes a local/client-only version of Spark and also adds `spark-submit` and `pyspark` to your `$PATH`.

> If you need to interact with a remote Spark cluster, use a version of PySpark that matches the cluster version.

#### If Spark is already installed or if you want full tooling to configure a local Spark cluster:

Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`.

> Note: The PySpark package is required if you want to run the tests in `test/`.

## Compatibility and Requirements

Tested with with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.
Tested with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.


## Get Sample Data
Expand All @@ -62,11 +92,10 @@ CC-PySpark reads the list of input files from a manifest file. Typically, these

### Running locally

First, point the environment variable `SPARK_HOME` to your Spark installation.
Then submit a job via
Spark jobs can be started using `spark-submit` (see [Setup](#setup) above if you have a manual installation of Spark):

```
$SPARK_HOME/bin/spark-submit ./server_count.py \
spark-submit ./server_count.py \
--num_output_partitions 1 --log_level WARN \
./input/test_warc.txt servernames
```
Expand All @@ -76,7 +105,7 @@ This will count web server names sent in HTTP response headers for the sample WA
The output table can be accessed via SparkSQL, e.g.,

```
$SPARK_HOME/bin/pyspark
$ pyspark
>>> df = sqlContext.read.parquet("spark-warehouse/servernames")
>>> for row in df.sort(df.val.desc()).take(10): print(row)
...
Expand All @@ -92,12 +121,29 @@ Row(key=u'Apache/2.2.15 (CentOS)', val=827)
Row(key=u'Apache-Coyote/1.1', val=790)
```

But it's also possible to configure a different output format, for example CSV or JSON, see the command-line options.
But it's also possible to configure a different output format, for example CSV or JSON; pass `--help` on the command line for more details.

See also
* [running the Spark shell and submitting Spark jobs](https://spark.apache.org/docs/latest/#running-the-examples-and-shell)
* [PySpark SQL API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

#### Debugging in an IDE

If the `.py` file for the job you want to debug is runnable (i.e. if it has a `if __name__ == "__main__":` line), you can bypass `spark-submit` and run it directly as a Python script:

```bash
python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames`
````

Spark will complain if the output directory exists - you may want to add a preprocessing step that deletes the appropriate subdirectory under `spark-warehouse` before each run, eg `rm -rf wpark-warehouse/servernames`.

> If you have manually installed Spark you'll need to ensure the pyspark package is on your PYTHONPATH:
> ```bash
> PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames
> ```

Note that the `run_job` code is still invoked by the Spark Java binary behind the scenes, which normally prevents a debugger from attaching. To debug the `run_job` internals, it's recommended to set up a unit test and debug that; see `test/test_sitemaps_from_robotstxt` for examples of single and batched job tests.


### Running in Spark cluster over large amounts of data

Expand All @@ -116,7 +162,7 @@ As the Common Crawl dataset lives in the Amazon Public Datasets program, you can

All examples show the available command-line options if called with the parameter `--help` or `-h`, e.g.
```
$SPARK_HOME/bin/spark-submit ./server_count.py --help
spark-submit ./server_count.py --help
```

#### Overwriting Spark configuration properties
Expand All @@ -126,7 +172,7 @@ There are many [Spark configuration properties](https://spark.apache.org/docs/la
It's possible to overwrite Spark properties when [submitting the job](https://spark.apache.org/docs/latest/submitting-applications.html):

```
$SPARK_HOME/bin/spark-submit \
spark-submit \
--conf spark.sql.warehouse.dir=myWareHouseDir \
... (other Spark options, flags, config properties) \
./server_count.py \
Expand Down Expand Up @@ -170,7 +216,7 @@ Please also note that:

Below an example call to count words in 10 WARC records host under the `.is` top-level domain using the `--packages` option:
```
$SPARK_HOME/bin/spark-submit \
spark-submit \
--packages org.apache.hadoop:hadoop-aws:3.3.2 \
./cc_index_word_count.py \
--input_base_url s3://commoncrawl/ \
Expand Down Expand Up @@ -210,6 +256,21 @@ Some differences between the warcio and FastWARC APIs are hidden from the user i
However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes).


## Running the Tests

To run the tests in `test/` you will need to add `.` and `test` to the PYTHONPATH:

```bash
PYTHONPATH=$PYTHONPATH:.:test pytest -v test
```

or if you have a manual installation of Spark:

```bash
PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python:.:test pytest -v test
```


## Credits

Examples are originally ported from Stephen Merity's [cc-mrjob](https://github.com/commoncrawl/cc-mrjob/) with the following changes and upgrades:
Expand Down
6 changes: 6 additions & 0 deletions get-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ for data_type in warc wat wet; do

done

echo "Downloading sample robots.txt file..."
ccfile='crawl-data/CC-MAIN-2017-30/segments/1500549423183.57/robotstxt/CC-MAIN-20170720121902-20170720141902-00000.warc.gz'
mkdir -p `dirname "$ccfile"`
wget --no-clobber https://data.commoncrawl.org/$ccfile -O $ccfile


4 changes: 4 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ lxml
#Resiliparse
# (tested with)
#Resiliparse==0.15.2

# testing
pytest
pytest-mock
112 changes: 112 additions & 0 deletions sitemaps_from_robotstxt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import re
from urllib.parse import urlparse, urljoin

from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from warcio.recordloader import ArcWarcRecord

from sparkcc import CCSparkJob

class SitemapExtractorJob(CCSparkJob):
"""Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files."""

name = "SitemapExtractor"

output_schema = StructType([
StructField('sitemap_url', StringType(), True),
StructField('hosts', ArrayType(elementType=StringType()), True)
])

# rb: match on raw bytes so we can defer utf-8 decoding to the `sitemap:` line
sitemap_pattern = re.compile(rb'^sitemap:\s*(\S+)', re.I)

robots_txt_processed = None
sitemap_urls_found = None
sitemap_url_invalid_encoding = None
robots_txt_announcing_sitemap = None
robots_txt_with_more_than_50_sitemaps = None


def log_accumulators(self, session):
super(SitemapExtractorJob, self).log_accumulators(session)

self.log_accumulator(session, self.robots_txt_processed,
'robots.txt successfully parsed = {}')
self.log_accumulator(session, self.sitemap_urls_found,
'sitemap urls found = {}')
self.log_accumulator(session, self.sitemap_url_invalid_encoding,
'sitemap urls with invalid utf-8 encoding = {}')
self.log_accumulator(session, self.robots_txt_announcing_sitemap,
'robots.txt announcing at least 1 sitemap = {}')
self.log_accumulator(session, self.robots_txt_with_more_than_50_sitemaps,
'robots.txt with more than 50 sitemaps = {}')


def init_accumulators(self, session):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also log_accumulators needs to be overridden, otherwise the class-specific accumulators are never shown resp. not preserved once the job has finished. See cc_index_word_count.py or wat_extract_links.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

super(SitemapExtractorJob, self).init_accumulators(session)

sc = session.sparkContext
self.robots_txt_processed = sc.accumulator(0)
self.sitemap_urls_found = sc.accumulator(0)
self.sitemap_url_invalid_encoding = sc.accumulator(0)
self.robots_txt_announcing_sitemap = sc.accumulator(0)
self.robots_txt_with_more_than_50_sitemaps = sc.accumulator(0)


def process_record(self, record: ArcWarcRecord):
""" emit: sitemap_url => [host] """
if not self.is_response_record(record):
# we're only interested in the HTTP responses
return

self.robots_txt_processed.add(1)
# robots_txt url/host are lazily computed when we encounter the first valid sitemap URL
robots_txt_url = None
robots_txt_host = None
n_sitemaps = 0

data = self.get_payload_stream(record).read()
for raw_line in data.splitlines():
raw_line = raw_line.strip()

match = SitemapExtractorJob.sitemap_pattern.match(raw_line)
if match:
sitemap_url = match.group(1).strip()
self.sitemap_urls_found.add(1)
try:
sitemap_url = sitemap_url.decode("utf-8", "strict")
except UnicodeDecodeError as e:
self.get_logger().warn(f'Invalid encoding of sitemap URL {sitemap_url}: {repr(e)}')
self.sitemap_url_invalid_encoding.add(1)
continue

if robots_txt_url is None:
# first sitemap found: set base URL and get host from URL
robots_txt_url = self.get_warc_header(record, 'WARC-Target-URI')
try:
robots_txt_host = urlparse(robots_txt_url).netloc.lower().lstrip('.')
except Exception as e:
self.get_logger().warn(f'Invalid robots.txt URL: {robots_txt_url}: {repr(e)}')
# skip this entire robots.txt record
return

if not (sitemap_url.startswith('http:') or sitemap_url.startswith('https:')):
# sitemap_url is relative; pass straight to urljoin which knows how to handle it correctly
try:
sitemap_url = urljoin(robots_txt_url, sitemap_url)
except Exception as e:
self.get_logger().warn(f'Error joining sitemap URL {sitemap_url} with base {robots_txt_url}: {repr(e)}')
continue

yield sitemap_url, [robots_txt_host]
n_sitemaps += 1

if n_sitemaps > 0:
self.robots_txt_announcing_sitemap.add(1)
if n_sitemaps > 50:
self.robots_txt_with_more_than_50_sitemaps.add(1)



if __name__ == '__main__':
job = SitemapExtractorJob()
job.run()
20 changes: 20 additions & 0 deletions sitemaps_from_robotstxt_fastwarc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from fastwarc.warc import WarcRecordType

from sparkcc_fastwarc import CCFastWarcSparkJob
from sitemaps_from_robotstxt import SitemapExtractorJob


class SitemapExtractorFastWarcJob(SitemapExtractorJob, CCFastWarcSparkJob):
"""Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files
using FastWARC to parse WARC files."""

name = "SitemapExtractorFastWarc"

# process only WARC response and metadata (including WAT) records
fastwarc_record_filter = WarcRecordType.response

# process_record is implemented by SitemapExtractorJob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "main" block is required in order to run the job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, also run the job to verify that there are no errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


if __name__ == '__main__':
job = SitemapExtractorFastWarcJob()
job.run()
Loading