diff --git a/.github/workflows/python_test.yaml b/.github/workflows/python_test.yaml new file mode 100644 index 0000000..4921533 --- /dev/null +++ b/.github/workflows/python_test.yaml @@ -0,0 +1,36 @@ +name: Python Unit Tests + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.10', '3.11', '3.12', '3.13'] + fail-fast: false + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + pip install -r requirements.txt + pip install pyspark==3.5.7 + + - name: Run tests + run: | + python -m pytest . -v + env: + # PySpark needs this to find the test files + PYTHONPATH: test diff --git a/README.md b/README.md index caeea65..2abacd8 100644 --- a/README.md +++ b/README.md @@ -28,18 +28,48 @@ Extending the [CCSparkJob](./sparkcc.py) isn't difficult and for many use cases ## Setup -To develop and test locally, you will need to install -* Spark, see the [detailed instructions](https://spark.apache.org/docs/latest/), and -* all required Python modules by running +To develop and test locally, you'll need **Python>=3.9** and **Spark**. + +### JRE + +Spark requires a 64-bit Java JRE (v8, 11, or 17 for Spark 3.5.7). Install this first. If you have an Apple Silicon device, Azul Zulu JRE is recommended for native architecture support. Ensure that either `java` is on your `$PATH` or the `$JAVA_HOME` env var points to your JRE. + +### Python dependencies + +Assuming you have Python already setup and a venv activated, install the `cc-pyspark` dependencies: + ``` pip install -r requirements.txt ``` -* (optionally, and only if you want to query the columnar index) [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3 +#### If you want to query the columnar index: +In addition to the above, [install S3 support libraries](#installation-of-s3-support-libraries) so that Spark can load the columnar index from S3. + +### Spark + +There are two ways to obtain Spark: +* manual installation / preinstallation +* as a pip package with `pip install` + +#### For simple development or to get started quickly, the `pip install` route is recommended: + +```bash +pip install pyspark==3.5.7 +``` + +This will install v3.5.7 of [the PySpark python package](https://spark.apache.org/docs/latest/api/python/getting_started/index.html), which includes a local/client-only version of Spark and also adds `spark-submit` and `pyspark` to your `$PATH`. + +> If you need to interact with a remote Spark cluster, use a version of PySpark that matches the cluster version. + +#### If Spark is already installed or if you want full tooling to configure a local Spark cluster: + +Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`. + +> Note: The PySpark package is required if you want to run the tests in `test/`. ## Compatibility and Requirements -Tested with with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions. +Tested with Spark 3.2.3, 3.3.2, 3.4.1, 3.5.5 in combination with Python 3.8, 3.9, 3.10, 3.12 and 3.13. See the branch [python-2.7](/commoncrawl/cc-pyspark/tree/python-2.7) if you want to run the job on Python 2.7 and older Spark versions. ## Get Sample Data @@ -62,11 +92,10 @@ CC-PySpark reads the list of input files from a manifest file. Typically, these ### Running locally -First, point the environment variable `SPARK_HOME` to your Spark installation. -Then submit a job via +Spark jobs can be started using `spark-submit` (see [Setup](#setup) above if you have a manual installation of Spark): ``` -$SPARK_HOME/bin/spark-submit ./server_count.py \ +spark-submit ./server_count.py \ --num_output_partitions 1 --log_level WARN \ ./input/test_warc.txt servernames ``` @@ -76,7 +105,7 @@ This will count web server names sent in HTTP response headers for the sample WA The output table can be accessed via SparkSQL, e.g., ``` -$SPARK_HOME/bin/pyspark +$ pyspark >>> df = sqlContext.read.parquet("spark-warehouse/servernames") >>> for row in df.sort(df.val.desc()).take(10): print(row) ... @@ -92,12 +121,29 @@ Row(key=u'Apache/2.2.15 (CentOS)', val=827) Row(key=u'Apache-Coyote/1.1', val=790) ``` -But it's also possible to configure a different output format, for example CSV or JSON, see the command-line options. +But it's also possible to configure a different output format, for example CSV or JSON; pass `--help` on the command line for more details. See also * [running the Spark shell and submitting Spark jobs](https://spark.apache.org/docs/latest/#running-the-examples-and-shell) * [PySpark SQL API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html) +#### Debugging in an IDE + +If the `.py` file for the job you want to debug is runnable (i.e. if it has a `if __name__ == "__main__":` line), you can bypass `spark-submit` and run it directly as a Python script: + +```bash +python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames` +```` + +Spark will complain if the output directory exists - you may want to add a preprocessing step that deletes the appropriate subdirectory under `spark-warehouse` before each run, eg `rm -rf wpark-warehouse/servernames`. + +> If you have manually installed Spark you'll need to ensure the pyspark package is on your PYTHONPATH: +> ```bash +> PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python python server_count.py --num_output_partitions 1 ./input/test_warc.txt servernames +> ``` + +Note that the `run_job` code is still invoked by the Spark Java binary behind the scenes, which normally prevents a debugger from attaching. To debug the `run_job` internals, it's recommended to set up a unit test and debug that; see `test/test_sitemaps_from_robotstxt` for examples of single and batched job tests. + ### Running in Spark cluster over large amounts of data @@ -116,7 +162,7 @@ As the Common Crawl dataset lives in the Amazon Public Datasets program, you can All examples show the available command-line options if called with the parameter `--help` or `-h`, e.g. ``` -$SPARK_HOME/bin/spark-submit ./server_count.py --help +spark-submit ./server_count.py --help ``` #### Overwriting Spark configuration properties @@ -126,7 +172,7 @@ There are many [Spark configuration properties](https://spark.apache.org/docs/la It's possible to overwrite Spark properties when [submitting the job](https://spark.apache.org/docs/latest/submitting-applications.html): ``` -$SPARK_HOME/bin/spark-submit \ +spark-submit \ --conf spark.sql.warehouse.dir=myWareHouseDir \ ... (other Spark options, flags, config properties) \ ./server_count.py \ @@ -170,7 +216,7 @@ Please also note that: Below an example call to count words in 10 WARC records host under the `.is` top-level domain using the `--packages` option: ``` -$SPARK_HOME/bin/spark-submit \ +spark-submit \ --packages org.apache.hadoop:hadoop-aws:3.3.2 \ ./cc_index_word_count.py \ --input_base_url s3://commoncrawl/ \ @@ -210,6 +256,21 @@ Some differences between the warcio and FastWARC APIs are hidden from the user i However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes). +## Running the Tests + +To run the tests in `test/` you will need to add `.` and `test` to the PYTHONPATH: + +```bash +PYTHONPATH=$PYTHONPATH:.:test pytest -v test +``` + +or if you have a manual installation of Spark: + +```bash +PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python:.:test pytest -v test +``` + + ## Credits Examples are originally ported from Stephen Merity's [cc-mrjob](https://github.com/commoncrawl/cc-mrjob/) with the following changes and upgrades: diff --git a/get-data.sh b/get-data.sh index e8e71b6..de776a5 100755 --- a/get-data.sh +++ b/get-data.sh @@ -45,3 +45,9 @@ for data_type in warc wat wet; do done +echo "Downloading sample robots.txt file..." +ccfile='crawl-data/CC-MAIN-2017-30/segments/1500549423183.57/robotstxt/CC-MAIN-20170720121902-20170720141902-00000.warc.gz' +mkdir -p `dirname "$ccfile"` +wget --no-clobber https://data.commoncrawl.org/$ccfile -O $ccfile + + diff --git a/requirements.txt b/requirements.txt index 8aca807..9c2bbb9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,7 @@ lxml #Resiliparse # (tested with) #Resiliparse==0.15.2 + +# testing +pytest +pytest-mock diff --git a/sitemaps_from_robotstxt.py b/sitemaps_from_robotstxt.py new file mode 100644 index 0000000..a939122 --- /dev/null +++ b/sitemaps_from_robotstxt.py @@ -0,0 +1,112 @@ +import re +from urllib.parse import urlparse, urljoin + +from pyspark.sql.types import StructType, StructField, StringType, ArrayType +from warcio.recordloader import ArcWarcRecord + +from sparkcc import CCSparkJob + +class SitemapExtractorJob(CCSparkJob): + """Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files.""" + + name = "SitemapExtractor" + + output_schema = StructType([ + StructField('sitemap_url', StringType(), True), + StructField('hosts', ArrayType(elementType=StringType()), True) + ]) + + # rb: match on raw bytes so we can defer utf-8 decoding to the `sitemap:` line + sitemap_pattern = re.compile(rb'^sitemap:\s*(\S+)', re.I) + + robots_txt_processed = None + sitemap_urls_found = None + sitemap_url_invalid_encoding = None + robots_txt_announcing_sitemap = None + robots_txt_with_more_than_50_sitemaps = None + + + def log_accumulators(self, session): + super(SitemapExtractorJob, self).log_accumulators(session) + + self.log_accumulator(session, self.robots_txt_processed, + 'robots.txt successfully parsed = {}') + self.log_accumulator(session, self.sitemap_urls_found, + 'sitemap urls found = {}') + self.log_accumulator(session, self.sitemap_url_invalid_encoding, + 'sitemap urls with invalid utf-8 encoding = {}') + self.log_accumulator(session, self.robots_txt_announcing_sitemap, + 'robots.txt announcing at least 1 sitemap = {}') + self.log_accumulator(session, self.robots_txt_with_more_than_50_sitemaps, + 'robots.txt with more than 50 sitemaps = {}') + + + def init_accumulators(self, session): + super(SitemapExtractorJob, self).init_accumulators(session) + + sc = session.sparkContext + self.robots_txt_processed = sc.accumulator(0) + self.sitemap_urls_found = sc.accumulator(0) + self.sitemap_url_invalid_encoding = sc.accumulator(0) + self.robots_txt_announcing_sitemap = sc.accumulator(0) + self.robots_txt_with_more_than_50_sitemaps = sc.accumulator(0) + + + def process_record(self, record: ArcWarcRecord): + """ emit: sitemap_url => [host] """ + if not self.is_response_record(record): + # we're only interested in the HTTP responses + return + + self.robots_txt_processed.add(1) + # robots_txt url/host are lazily computed when we encounter the first valid sitemap URL + robots_txt_url = None + robots_txt_host = None + n_sitemaps = 0 + + data = self.get_payload_stream(record).read() + for raw_line in data.splitlines(): + raw_line = raw_line.strip() + + match = SitemapExtractorJob.sitemap_pattern.match(raw_line) + if match: + sitemap_url = match.group(1).strip() + self.sitemap_urls_found.add(1) + try: + sitemap_url = sitemap_url.decode("utf-8", "strict") + except UnicodeDecodeError as e: + self.get_logger().warn(f'Invalid encoding of sitemap URL {sitemap_url}: {repr(e)}') + self.sitemap_url_invalid_encoding.add(1) + continue + + if robots_txt_url is None: + # first sitemap found: set base URL and get host from URL + robots_txt_url = self.get_warc_header(record, 'WARC-Target-URI') + try: + robots_txt_host = urlparse(robots_txt_url).netloc.lower().lstrip('.') + except Exception as e: + self.get_logger().warn(f'Invalid robots.txt URL: {robots_txt_url}: {repr(e)}') + # skip this entire robots.txt record + return + + if not (sitemap_url.startswith('http:') or sitemap_url.startswith('https:')): + # sitemap_url is relative; pass straight to urljoin which knows how to handle it correctly + try: + sitemap_url = urljoin(robots_txt_url, sitemap_url) + except Exception as e: + self.get_logger().warn(f'Error joining sitemap URL {sitemap_url} with base {robots_txt_url}: {repr(e)}') + continue + + yield sitemap_url, [robots_txt_host] + n_sitemaps += 1 + + if n_sitemaps > 0: + self.robots_txt_announcing_sitemap.add(1) + if n_sitemaps > 50: + self.robots_txt_with_more_than_50_sitemaps.add(1) + + + +if __name__ == '__main__': + job = SitemapExtractorJob() + job.run() diff --git a/sitemaps_from_robotstxt_fastwarc.py b/sitemaps_from_robotstxt_fastwarc.py new file mode 100644 index 0000000..ce1d3a7 --- /dev/null +++ b/sitemaps_from_robotstxt_fastwarc.py @@ -0,0 +1,20 @@ +from fastwarc.warc import WarcRecordType + +from sparkcc_fastwarc import CCFastWarcSparkJob +from sitemaps_from_robotstxt import SitemapExtractorJob + + +class SitemapExtractorFastWarcJob(SitemapExtractorJob, CCFastWarcSparkJob): + """Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files + using FastWARC to parse WARC files.""" + + name = "SitemapExtractorFastWarc" + + # process only WARC response and metadata (including WAT) records + fastwarc_record_filter = WarcRecordType.response + + # process_record is implemented by SitemapExtractorJob + +if __name__ == '__main__': + job = SitemapExtractorFastWarcJob() + job.run() diff --git a/test/test_sitemaps_from_robotstxt.py b/test/test_sitemaps_from_robotstxt.py new file mode 100644 index 0000000..ba32527 --- /dev/null +++ b/test/test_sitemaps_from_robotstxt.py @@ -0,0 +1,554 @@ +import json +from io import BytesIO + +import pytest +from unittest.mock import MagicMock, Mock + +from pyspark.sql import SparkSession +from warcio.recordloader import ArcWarcRecord + +from sitemaps_from_robotstxt import SitemapExtractorJob +from sparkcc import CCSparkJob +from utils import _process_jobs + + +@pytest.fixture(scope='session') +def spark(): + return SparkSession.builder.appName('test_session').getOrCreate() + + +def make_robots_txt_record(warc_target_uri, + response_bytes) -> Mock: + """ + Create a mock robots.txt WARC record for testing. + """ + record = MagicMock() + record.rec_type = 'response' + # mock rec_headers.get_header('WARC-Target-URI') + record.rec_headers.get_header = lambda key, default: warc_target_uri if key == 'WARC-Target-URI' else default + record.content_stream = lambda: BytesIO(response_bytes) + + return record + + +def test_well_formed_record(spark): + record = make_robots_txt_record("http://ajedrezhoygol.blogspot.com.ar/robots.txt", + """User-agent: Mediapartners-Google +Disallow: + +User-agent: * +Disallow: /search +Allow: / + +Sitemap: http://ajedrezhoygol.blogspot.com/sitemap.xml +""".encode('utf-8')) + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 1 + assert results[0][0] == 'http://ajedrezhoygol.blogspot.com/sitemap.xml' + assert results[0][1] == ["ajedrezhoygol.blogspot.com.ar"] + assert job.sitemap_urls_found.value == 1 + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_different_host_record(spark): + record = make_robots_txt_record( + 'http://177.52.3535.ru/robots.txt', +''' +User-agent: Yandex +Disallow: /bitrix/ +Disallow: /upload/ +Disallow: /detsad/ +Disallow: /videouroki/ +Disallow: /humor/ +Disallow: /radio/ +Disallow: /recepts/ +Disallow: /school_life/ +Disallow: /workgroups/ +Disallow: /institutions/ +Disallow: /kindergarten/ +Disallow: /unified-state-exam/ +Disallow: /ideas/ +Disallow: /documents/ +Disallow: /videosearch/ +Disallow: /auth/ +Disallow: /demotivators/ +Disallow: /additional-education/ +Disallow: /admission/ +Disallow: /random/ +Disallow: /horoscope/ +Disallow: /monitoring-in-the-sphere-of-education/ +Disallow: /votes/ +Disallow: /news/ +Disallow: /clips/ +Disallow: /preschool-education/ +Disallow: /movies/ +Disallow: /TV/ +Disallow: /dreambook/ +Disallow: /about/ +Disallow: /company/ +Disallow: /edit/ +Disallow: /com-docs/ +Disallow: /professional-education/ +Disallow: /vs.php +Disallow: /index-old.php +Disallow: /404.php +Disallow: /suz/ +Disallow: /school-education/ +Disallow: /municipal-education-authorities/ +Disallow: /com-about/ +Disallow: /parents/ +Disallow: /view/ +Disallow: /stat/ +Disallow: /quotes/ +Disallow: /region/ +Disallow: /students/ +Disallow: /graduates/ +Disallow: /job/ +Disallow: /auth.php +Disallow: /search/ +Disallow: /search/ +Disallow: /auth/ +Disallow: /auth.php +Disallow: /personal/ +Disallow: /*?print= +Disallow: /*&print= +Disallow: /*register=yes +Disallow: /*forgot_password=yes +Disallow: /*change_password=yes +Disallow: /*login=yes +Disallow: /*logout=yes +Disallow: /*auth=yes +Disallow: /*action=ADD_TO_COMPARE_LIST +Disallow: /*action=DELETE_FROM_COMPARE_LIST +Disallow: /*action=ADD2BASKET +Disallow: /*action=BUY +Disallow: /*bitrix_*= +Disallow: /*backurl=* +Disallow: /*BACKURL=* +Disallow: /*back_url=* +Disallow: /*BACK_URL=* +Disallow: /*back_url_admin=* +Disallow: /*index.php$ +Disallow: /*?* + + +User-agent: * +Disallow: /bitrix/ +Disallow: /upload/ +Disallow: /detsad/ +Disallow: /humor/ +Disallow: /videouroki/ +Disallow: /radio/ +Disallow: /recepts/ +Disallow: /school_life/ +Disallow: /workgroups/ +Disallow: /institutions/ +Disallow: /kindergarten/ +Disallow: /unified-state-exam/ +Disallow: /ideas/ +Disallow: /documents/ +Disallow: /videosearch/ +Disallow: /auth/ +Disallow: /demotivators/ +Disallow: /additional-education/ +Disallow: /admission/ +Disallow: /random/ +Disallow: /horoscope/ +Disallow: /monitoring-in-the-sphere-of-education/ +Disallow: /votes/ +Disallow: /news/ +Disallow: /clips/ +Disallow: /preschool-education/ +Disallow: /movies/ +Disallow: /TV/ +Disallow: /dreambook/ +Disallow: /about/ +Disallow: /company/ +Disallow: /edit/ +Disallow: /com-docs/ +Disallow: /professional-education/ +Disallow: /vs.php +Disallow: /index-old.php +Disallow: /404.php +Disallow: /suz/ +Disallow: /school-education/ +Disallow: /municipal-education-authorities/ +Disallow: /com-about/ +Disallow: /parents/ +Disallow: /view/ +Disallow: /stat/ +Disallow: /quotes/ +Disallow: /region/ +Disallow: /students/ +Disallow: /graduates/ +Disallow: /job/ +Disallow: /auth.php +Disallow: /search/ +Disallow: /search/ +Disallow: /auth/ +Disallow: /auth.php +Disallow: /personal/ +Disallow: /*?print= +Disallow: /*&print= +Disallow: /*register=yes +Disallow: /*forgot_password=yes +Disallow: /*change_password=yes +Disallow: /*login=yes +Disallow: /*logout=yes +Disallow: /*auth=yes +Disallow: /*action=ADD_TO_COMPARE_LIST +Disallow: /*action=DELETE_FROM_COMPARE_LIST +Disallow: /*action=ADD2BASKET +Disallow: /*action=BUY +Disallow: /*bitrix_*= +Disallow: /*backurl=* +Disallow: /*BACKURL=* +Disallow: /*back_url=* +Disallow: /*BACK_URL=* +Disallow: /*back_url_admin=* +Disallow: /*index.php$ +Disallow: /*?* +Host: 3535.ru + + +Sitemap: http://3535.ru/sitemap_000.xml +'''.encode('utf-8') + ) + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 1 + assert results[0][0] == 'http://3535.ru/sitemap_000.xml' + assert results[0][1] == ['177.52.3535.ru'] + + + + +def test_host_accumulation_same_host(spark): + """ + Test accumulation of hosts when sitemap url host and robots.txt url host match + Requires test/ on PYTHONPATH so utils._process_jobs can be imported + """ + + record = make_robots_txt_record("http://agencasinosbobet5.weebly.com/robots.txt", +"""Sitemap: http://agencasinosbobet5.weebly.com/sitemap.xml + +User-agent: NerdyBot +Disallow: / + +User-agent: * +Disallow: /ajax/ +Disallow: /apps/ +""".encode('utf-8')) + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + + records = [record] + rdd = spark.sparkContext.parallelize(records) + _process_jobs_partial = lambda partition_index, records: _process_jobs(partition_index, records, job=job) + output = rdd.mapPartitionsWithIndex(_process_jobs_partial) + output = output.reduceByKey(CCSparkJob.reduce_by_key_func).collect() + + assert len(output) == 1 + assert output[0][0] == 'http://agencasinosbobet5.weebly.com/sitemap.xml' + assert output[0][1] == ['agencasinosbobet5.weebly.com'] + assert job.sitemap_urls_found.value == 1 + assert job.sitemap_url_invalid_encoding.value == 0 + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_host_accumulation_multi(spark): + """ + Test accumulation of multiple hosts for same sitemap URL from different robots.txt records + Requires test/ on PYTHONPATH so utils._process_jobs can be imported + """ + + multi_robots_txt_data = [ + ( + "http://the-mayflower-hotel-autograph-collection-washington.ibooked.com.br/robots.txt", + """User-Agent: * +Allow: / +Disallow: /reviewpage/ +Disallow: /ajax/ +Disallow: /?page=stat +Disallow: /?page=hotel_ajax +Disallow: /?page=hotellist_json +Disallow: /reviewpage2/ + + +User-agent: Yandex +Host: nochi.com +Allow: / +Disallow: /reviewpage/ + +Disallow: /ajax/ +Disallow: /?page=stat +Disallow: /?page=hotel_ajax +Disallow: /?page=hotellist_json +Sitemap: http://nochi.com/data/sitemaps/ru_index.xml +""".encode('utf-8') + ), + ( + "http://the-rockies-condominiums-steamboat-springs.booked.net/robots.txt", + """User-Agent: * +Allow: / +Disallow: /reviewpage/ +Disallow: /ajax/ +Disallow: /?page=stat +Disallow: /?page=hotel_ajax +Disallow: /?page=hotellist_json +Disallow: /reviewpage2/ + + +User-agent: Yandex +Host: nochi.com +Allow: / +Disallow: /reviewpage/ + +Disallow: /ajax/ +Disallow: /?page=stat +Disallow: /?page=hotel_ajax +Disallow: /?page=hotellist_json +Sitemap: http://nochi.com/data/sitemaps/ru_index.xml +""".encode('utf-8'), + ), + ( + "http://hotel-flora-venice.booked.kr/robots.txt", + """User-Agent: * +Allow: / +Disallow: /reviewpage/ +Disallow: /ajax/ +Disallow: /?page=stat +Disallow: /?page=hotel_ajax +Disallow: /?page=hotellist_json +Disallow: /reviewpage2/ + + +User-agent: Yandex +Host: nochi.com +Allow: / +Disallow: /reviewpage/ + +Disallow: /ajax/ +Disallow: /?page=stat +Disallow: /?page=hotel_ajax +Disallow: /?page=hotellist_json +Sitemap: http://nochi.com/data/sitemaps/ru_index.xml +""".encode('utf-8') + ) + ] + + records = [make_robots_txt_record(robots_txt_url, robots_txt_content) + for robots_txt_url, robots_txt_content in multi_robots_txt_data] + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + rdd = spark.sparkContext.parallelize(records) + _process_jobs_partial = lambda partition_index, records: _process_jobs(partition_index, records, job=job) + output = rdd.mapPartitionsWithIndex(_process_jobs_partial) + output = output.reduceByKey(CCSparkJob.reduce_by_key_func).collect() + assert len(output) == 1 + assert output[0][0] == 'http://nochi.com/data/sitemaps/ru_index.xml' + assert sorted(output[0][1]) == sorted(["the-mayflower-hotel-autograph-collection-washington.ibooked.com.br","the-rockies-condominiums-steamboat-springs.booked.net","hotel-flora-venice.booked.kr"]) + assert job.sitemap_urls_found.value == 3 + assert job.sitemap_url_invalid_encoding.value == 0 + assert job.robots_txt_announcing_sitemap.value == 3 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 3 + + +def test_wrong_encoding_utf16_record(spark): + record = make_robots_txt_record("http://ajedrezhoygol.blogspot.com.ar/robots.txt", + """User-agent: Mediapartners-Google +Disallow: + +User-agent: * +Disallow: /search +Allow: / + +Sitemap: http://ajedrezhoygol.blogspot.com/sitemap.xml +""".encode('utf-16')) + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 0 + assert job.sitemap_urls_found.value == 0 + assert job.sitemap_url_invalid_encoding.value == 0 + assert job.robots_txt_announcing_sitemap.value == 0 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_robots_txt_invalid_url_unparseable_netloc(spark): + """ Test malformed WARC-Target-URI """ + record = make_robots_txt_record( + warc_target_uri='http://[malformed::url]/robots.txt', + response_bytes="""User-agent: * +Disallow: /admin/ + +Sitemap: http://example.com/sitemap.xml +Sitemap: http://example.com/sitemap2.xml +""".encode('utf-8') + ) + + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 0 + assert job.sitemap_urls_found.value == 1 + assert job.sitemap_url_invalid_encoding.value == 0 + assert job.robots_txt_announcing_sitemap.value == 0 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_robots_txt_invalid_punycode_url(spark): + """ Test invalid punycode in WARC-Target-URI """ + record = make_robots_txt_record("http://xn--foo/robots.txt", + """User-agent: * +Disallow: / + +Sitemap: http://example.com/sitemap.xml +""".encode('utf-8')) + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 1 + assert job.sitemap_urls_found.value == 1 + assert job.sitemap_url_invalid_encoding.value == 0 + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_sitemap_url_invalid_encoding_latin1(spark): + """ Test incorrectly encoded sitemap URL - latin-1 bytes """ + # The byte sequence \xe9 is é in Latin-1 but invalid in UTF-8 when standalone + record = make_robots_txt_record( + warc_target_uri='http://example.com/robots.txt', + response_bytes=b"""User-agent: * +Disallow: / + +Sitemap: http://example.com/sitemap_caf\xe9.xml +""" + ) + + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 0 + assert job.sitemap_urls_found.value == 1 + assert job.sitemap_url_invalid_encoding.value == 1 + assert job.robots_txt_announcing_sitemap.value == 0 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_sitemap_url_invalid_encoding_mixed_bytes(spark): + """ Test incorrectly encoded sitemap URL - mixed UTF-8 invalid bytes """ + # The byte sequence \xff\xfe is not valid UTF-8 + record = make_robots_txt_record( + warc_target_uri='http://example.com/robots.txt', + # improperly encoded UTF-8 byte sequence in second sitemap URL + response_bytes=b"""User-agent: * +Disallow: /search + +Sitemap: http://example.com/good_sitemap.xml +Sitemap: http://example.com/bad\xff\xfe_sitemap.xml +Sitemap: http://example2.com/another_good.xml +""" + ) + + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 2 + assert results == [ + ('http://example.com/good_sitemap.xml', ['example.com']), + ('http://example2.com/another_good.xml', ['example.com']) + ] + assert job.sitemap_url_invalid_encoding.value == 1 + assert job.sitemap_urls_found.value == 3 # All 3 matched the pattern + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_sitemap_url_invalid_malformed_url(spark): + """ Test invalid malformed sitemap URL """ + record = make_robots_txt_record( + warc_target_uri='http://example.com/robots.txt', + response_bytes=b"""User-agent: * +Disallow: / + +Sitemap: ht!tp://[malformed::url]/sitemap.xml +""" + ) + + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 1 + assert job.sitemap_urls_found.value == 1 + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_sitemap_url_invalid_malformed_url_multi(spark): + """ Test multiple sitemap URLs, one invalid """ + # http://xn--invalid is malformed punycode + record = make_robots_txt_record( + warc_target_uri='http://example.com/robots.txt', + response_bytes=b"""User-agent: * +Disallow: /admin/ + +Sitemap: http://valid-site.com/sitemap1.xml +Sitemap: http://xn--invalid/sitemap.xml +Sitemap: http://another-valid-site.com/sitemap2.xml +""" + ) + + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 3 + assert results == [ + ('http://valid-site.com/sitemap1.xml', ['example.com']), + ('http://xn--invalid/sitemap.xml', ['example.com']), + ('http://another-valid-site.com/sitemap2.xml', ['example.com']) + ] + assert job.sitemap_urls_found.value == 3 + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 0 + assert job.robots_txt_processed.value == 1 + + +def test_50_sitemap_urls(spark): + """ Test multiple sitemap URLs, one invalid """ + # http://xn--invalid is malformed punycode + record = make_robots_txt_record( + warc_target_uri='http://example.com/robots.txt', + response_bytes=("""User-agent: * +Disallow: /admin/ + +""" + "\n".join(f"Sitemap: http://valid-site.com/sitemap{i}.xml" for i in range(60)) + ).encode('utf-8')) + + job = SitemapExtractorJob() + job.init_accumulators(session=spark) + results = list(job.process_record(record)) + assert len(results) == 60 + for sitemap_url, host in results: + assert sitemap_url.startswith("http://valid-site.com/sitemap") + assert host == ["example.com"] + assert job.sitemap_urls_found.value == 60 + assert job.robots_txt_announcing_sitemap.value == 1 + assert job.robots_txt_with_more_than_50_sitemaps.value == 1 + assert job.robots_txt_processed.value == 1 + diff --git a/test/utils.py b/test/utils.py new file mode 100644 index 0000000..3c16f02 --- /dev/null +++ b/test/utils.py @@ -0,0 +1,5 @@ + +def _process_jobs(partition_index, records, job): + for record in records: + for result in job.process_record(record): + yield result