Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions .github/workflows/python_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Python Unit Tests

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
workflow_dispatch:

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
fail-fast: false

steps:
- uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -r requirements-pyspark.txt

- name: Run tests
run: |
python -m pytest . -v
env:
# pyspark needs this to find the test files
PYTHONPATH: test
49 changes: 36 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,32 @@ Extending the [CCSparkJob](./sparkcc.py) isn't difficult and for many use cases

## Setup

To develop and test locally, you will need to install
* Spark, see the [detailed instructions](https://spark.apache.org/docs/latest/), and
* all required Python modules by running
```
pip install -r requirements.txt
```
* (optionally, and only if you want to query the columnar index) [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3
To develop and test locally, you'll need Python>=3.9 and Spark.

#### If Spark is already installed:
(or if you want full control over your Spark cluster configuration), install only the Python dependencies:

```bash
pip install -r requirements.txt
```

Ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`. See the [Spark documentation](https://spark.apache.org/docs/latest/) for more information.

#### If you just want to get started quickly with local no-cluster development:

```bash
pip install -r requirements.txt
pip install -r requirements-pyspark.txt
```
This will install [the PySpark python package](https://spark.apache.org/docs/latest/api/python/getting_started/index.html), which includes a local/client-only version of Spark and also adds `spark-submit` and `pyspark` to your `$PATH`.

#### If you want to query the columnar index:
In addition to the above, [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3.


## Compatibility and Requirements

Tested with with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.
Tested with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions.


## Get Sample Data
Expand All @@ -62,11 +76,10 @@ CC-PySpark reads the list of input files from a manifest file. Typically, these

### Running locally

First, point the environment variable `SPARK_HOME` to your Spark installation.
Then submit a job via
Spark jobs can be started using `spark-submit` (see [Setup](#setup) above if you have a manual installation of Spark):

```
$SPARK_HOME/bin/spark-submit ./server_count.py \
spark-submit ./server_count.py \
--num_output_partitions 1 --log_level WARN \
./input/test_warc.txt servernames
```
Expand All @@ -76,7 +89,7 @@ This will count web server names sent in HTTP response headers for the sample WA
The output table can be accessed via SparkSQL, e.g.,

```
$SPARK_HOME/bin/pyspark
$ pyspark
>>> df = sqlContext.read.parquet("spark-warehouse/servernames")
>>> for row in df.sort(df.val.desc()).take(10): print(row)
...
Expand All @@ -92,12 +105,22 @@ Row(key=u'Apache/2.2.15 (CentOS)', val=827)
Row(key=u'Apache-Coyote/1.1', val=790)
```

But it's also possible to configure a different output format, for example CSV or JSON, see the command-line options.
But it's also possible to configure a different output format, for example CSV or JSON; pass `--help` on the command line for more details.

See also
* [running the Spark shell and submitting Spark jobs](https://spark.apache.org/docs/latest/#running-the-examples-and-shell)
* [PySpark SQL API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

#### Debugging in an IDE

If the `.py` file for the job you want to debug is runnable (i.e. if it has a `if __name__ == "__main__":` line), you can invoke it directly as a Python script without needing to go through spark-submit:

```bash
python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames`
````

Spark will complain if the output directory exists - you may want to add a preprocessing step that deletes the appropriate subdirectory under `spark-warehouse` before each run, eg `rm -rf wpark-warehouse/servernames`.


### Running in Spark cluster over large amounts of data

Expand Down
6 changes: 6 additions & 0 deletions get-data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ for data_type in warc wat wet; do

done

echo "Downloading sample robots.txt file..."
ccfile='crawl-data/CC-MAIN-2017-30/segments/1500549423183.57/robotstxt/CC-MAIN-20170720121902-20170720141902-00000.warc.gz'
mkdir -p `dirname "$ccfile"`
wget --no-clobber https://data.commoncrawl.org/$ccfile -O $ccfile


1 change: 1 addition & 0 deletions requirements-pyspark.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pyspark==3.5.7
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ ujson
orjson
warcio

# for validating URLs in robots.txt:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

validators

# for link extraction and webgraph construction also:
idna

Expand All @@ -27,3 +30,7 @@ lxml
#Resiliparse
# (tested with)
#Resiliparse==0.15.2

# testing
pytest
pytest-mock
117 changes: 117 additions & 0 deletions sitemaps_from_robotstxt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import json
import re
from typing import Optional
from urllib.parse import urlparse, urljoin

import validators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

from py4j.protocol import Py4JError
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from warcio.recordloader import ArcWarcRecord

from sparkcc import CCSparkJob

class SitemapExtractorJob(CCSparkJob):
"""Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files."""

name = "SitemapExtractor"

output_schema = StructType([
StructField('sitemap_url', StringType(), True),
StructField('hosts', ArrayType(elementType=StringType()), True)
])

# rb: match on raw bytes so we can defer utf-8 decoding to the `sitemap:` line
sitemap_pattern = re.compile(rb'^sitemap:\s*(\S+)', re.I)

robots_txt_processed = None
sitemap_urls_found = None
sitemap_url_invalid_encoding = None
robots_txt_announcing_sitemap = None
robots_txt_with_more_than_50_sitemaps = None


def init_accumulators(self, session):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also log_accumulators needs to be overridden, otherwise the class-specific accumulators are never shown resp. not preserved once the job has finished. See cc_index_word_count.py or wat_extract_links.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

super(SitemapExtractorJob, self).init_accumulators(session)

sc = session.sparkContext
self.robots_txt_processed = sc.accumulator(0)
self.sitemap_urls_found = sc.accumulator(0)
self.sitemap_url_invalid_encoding = sc.accumulator(0)
self.robots_txt_announcing_sitemap = sc.accumulator(0)
self.robots_txt_with_more_than_50_sitemaps = sc.accumulator(0)


def process_record(self, record: ArcWarcRecord):
""" emit: sitemap_url => [host] """
if not self.is_response_record(record):
# we're only interested in the HTTP responses
return

self.robots_txt_processed.add(1)
# robots_txt url/host are lazily computed when we encounter the first valid sitemap URL
robots_txt_url = None
robots_txt_host = None
n_sitemaps = 0

data = self.get_payload_stream(record).read()
for raw_line in data.splitlines():
raw_line = raw_line.strip()

match = SitemapExtractorJob.sitemap_pattern.match(raw_line)
if match:
sitemap_url = match.group(1).strip()
self.sitemap_urls_found.add(1)
try:
sitemap_url = sitemap_url.decode("utf-8", "strict")
except UnicodeDecodeError as e:
self.get_logger().warn(f'Invalid encoding of sitemap URL {sitemap_url}: {repr(e)}')
self.sitemap_url_invalid_encoding.add(1)
continue

if robots_txt_url is None:
# first sitemap found: set base URL and get host from URL
robots_txt_url = record.rec_headers['WARC-Target-URI']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not compatible with FastWARC, should be self.get_warc_header(record, 'WARC-Target-URI')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

try:
robots_txt_host = urlparse(robots_txt_url).netloc.lower().lstrip('.')
except Exception as e1:
try:
self.get_logger().warn(f'Invalid robots.txt URL: {robots_txt_url} - {repr(e1)}')
except Exception as e2:
self.get_logger().warn(f'Invalid robots.txt URL - {repr(e1)} (cannot display: {repr(e2)})')
# skip this entire robots.txt record
return

if not (sitemap_url.startswith('http:') or sitemap_url.startswith('https:')):
# sitemap_url is relative; pass straight to urljoin which knows how to handle it correctly
try:
sitemap_url = urljoin(robots_txt_url, sitemap_url)
except Exception as e:
try:
self.get_logger().warn(f'Error joining sitemap URL {sitemap_url} with base {robots_txt_url}: {repr(e)}')
except Exception as log_e:
self.get_logger().warn(f'Error joining sitemap URL with base - {repr(e)} (cannot display: {repr(log_e)})')
continue

yield sitemap_url, [robots_txt_host]
n_sitemaps += 1

if n_sitemaps > 0:
self.robots_txt_announcing_sitemap.add(1)
if n_sitemaps > 50:
self.robots_txt_with_more_than_50_sitemaps.add(1)


def _try_parse_host(self, url: str, label_for_log: str) -> str|None:
try:
return urlparse(url).netloc.lower().lstrip('.')
except Exception as e:
try:
self.get_logger().warn(f'Invalid {label_for_log} URL: {url} - {repr(e)}')
except Exception as log_e:
self.get_logger().warn(f'Invalid {label_for_log} URL - {repr(e)} (cannot display: {repr(log_e)})')
return None


if __name__ == '__main__':
job = SitemapExtractorJob()
job.run()
16 changes: 16 additions & 0 deletions sitemaps_from_robotstxt_fastwarc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from fastwarc.warc import WarcRecordType

from sparkcc_fastwarc import CCFastWarcSparkJob
from sitemaps_from_robotstxt import SitemapExtractorJob


class SitemapExtractorFastWarcJob(SitemapExtractorJob, CCFastWarcSparkJob):
"""Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files
using FastWARC to parse WARC files."""

name = "SitemapExtractorFastWarc"

# process only WARC response and metadata (including WAT) records
fastwarc_record_filter = WarcRecordType.response

# process_record is implemented by SitemapExtractorJob
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "main" block is required in order to run the job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, also run the job to verify that there are no errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Loading