diff --git a/engine/src/main/java/org/archive/crawler/frontier/BdbMultipleWorkQueues.java b/engine/src/main/java/org/archive/crawler/frontier/BdbMultipleWorkQueues.java index a45712dc3..48d619704 100644 --- a/engine/src/main/java/org/archive/crawler/frontier/BdbMultipleWorkQueues.java +++ b/engine/src/main/java/org/archive/crawler/frontier/BdbMultipleWorkQueues.java @@ -109,6 +109,7 @@ public long deleteMatchingFromQueue(String match, String queue, DatabaseEntry key = headKey; DatabaseEntry value = new DatabaseEntry(); Cursor cursor = null; + Thread.interrupted(); try { cursor = pendingUrisDB.openCursor(null, null); OperationStatus result = cursor.getSearchKeyRange(headKey, @@ -165,6 +166,7 @@ public CompositeData getFrom( Cursor cursor = null; OperationStatus result = null; + Thread.interrupted(); try { cursor = pendingUrisDB.openCursor(null,null); result = cursor.getSearchKey(key, value, null); @@ -217,6 +219,7 @@ public CompositeData getFrom( protected DatabaseEntry getFirstKey() throws DatabaseException { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); + Thread.interrupted(); Cursor cursor = pendingUrisDB.openCursor(null,null); OperationStatus status = cursor.getNext(key,value,null); cursor.close(); @@ -290,6 +293,7 @@ protected OperationStatus getNextNearestItem(DatabaseEntry headKey, DatabaseEntry result) throws DatabaseException { Cursor cursor = null; OperationStatus status; + Thread.interrupted(); try { cursor = this.pendingUrisDB.openCursor(null, null); @@ -337,6 +341,7 @@ public void put(CrawlURI curi, boolean overwriteIfPresent) tallyAverageEntrySize(curi, value); } OperationStatus status; + Thread.interrupted(); if(overwriteIfPresent) { status = pendingUrisDB.put(null, insertKey, value); } else { @@ -479,6 +484,7 @@ private static int findFirstZero(byte[] b) { */ public void delete(CrawlURI item) throws DatabaseException { OperationStatus status; + Thread.interrupted(); DatabaseEntry de = (DatabaseEntry)item.getHolderKey(); status = pendingUrisDB.delete(null, de); if (status != OperationStatus.SUCCESS) { @@ -504,6 +510,7 @@ protected void sync() { if (this.pendingUrisDB == null) { return; } + Thread.interrupted(); try { this.pendingUrisDB.sync(); } catch (DatabaseException e) { @@ -533,6 +540,7 @@ public void close() { * @param origin key at which to insert the cap */ public void addCap(byte[] origin) { + Thread.interrupted(); try { pendingUrisDB.put(null, new DatabaseEntry(origin), new DatabaseEntry(new byte[0])); @@ -549,6 +557,7 @@ public void addCap(byte[] origin) { protected void forAllPendingDo(Closure c) throws DatabaseException { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); + Thread.interrupted(); Cursor cursor = pendingUrisDB.openCursor(null, null); while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) { if (value.getData().length == 0) { @@ -573,6 +582,7 @@ public long exportPendingUris(PrintWriter writer) { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry value = new DatabaseEntry(); long uris = 0L; + Thread.interrupted(); Cursor cursor = pendingUrisDB.openCursor(null, null); while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) { if (value.getData().length == 0) { diff --git a/engine/src/test/java/org/archive/crawler/frontier/BdbMultipleWorkQueuesTest.java b/engine/src/test/java/org/archive/crawler/frontier/BdbMultipleWorkQueuesTest.java index e4ed72415..0229f89ae 100644 --- a/engine/src/test/java/org/archive/crawler/frontier/BdbMultipleWorkQueuesTest.java +++ b/engine/src/test/java/org/archive/crawler/frontier/BdbMultipleWorkQueuesTest.java @@ -1,8 +1,8 @@ /* * This file is part of the Heritrix web crawler (crawler.archive.org). * - * Licensed to the Internet Archive (IA) by one or more individual - * contributors. + * Licensed to the Internet Archive (IA) by one or more individual + * contributors. * * The IA licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with @@ -18,29 +18,98 @@ */ package org.archive.crawler.frontier; -import java.util.logging.Logger; - -import org.archive.url.URIException; +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.tree.Key; +import org.apache.commons.io.FileUtils; +import org.archive.bdb.BdbModule; +import org.archive.bdb.StoredQueue; import org.archive.modules.CrawlURI; import org.archive.modules.SchedulingConstants; +import org.archive.net.UURI; import org.archive.net.UURIFactory; - -import com.sleepycat.je.tree.Key; - +import org.archive.url.URIException; +import org.archive.util.Recorder; +import org.archive.util.bdbje.EnhancedEnvironment; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.logging.Logger; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** - * Unit tests for BdbMultipleWorkQueues functionality. - * + * Unit tests for BdbMultipleWorkQueues functionality. + * * @author gojomo */ public class BdbMultipleWorkQueuesTest { private static Logger logger = Logger.getLogger(BdbMultipleWorkQueuesTest.class.getName()); + @TempDir + Path tempDir; + @TempDir + Path curiTempDir; + + private BdbMultipleWorkQueues pendingUris = null; + private EnhancedEnvironment env; + private Database db; + private File envDir; + + protected Recorder getRecorder() throws IOException { + if (Recorder.getHttpRecorder() == null) { + Recorder httpRecorder = new Recorder(curiTempDir.toFile(), + getClass().getName(), 16 * 1024, 512 * 1024); + Recorder.setHttpRecorder(httpRecorder); + } + + return Recorder.getHttpRecorder(); + } + + protected CrawlURI makeCrawlURI(String uri) throws URIException, + IOException { + UURI uuri = UURIFactory.getInstance(uri); + CrawlURI curi = new CrawlURI(uuri); + curi.setClassKey("key"); + curi.setSeed(true); + curi.setRecorder(getRecorder()); + return curi; + } + + @BeforeEach + protected void setUp() throws Exception { + this.envDir = new File(tempDir.toFile(),"BdbMultipleWorkQueuesTest"); + org.archive.util.FileUtils.ensureWriteableDirectory(this.envDir); + try { + EnvironmentConfig envConfig = new EnvironmentConfig(); + envConfig.setTransactional(false); + envConfig.setAllowCreate(true); + env = new EnhancedEnvironment(envDir,envConfig); + BdbModule.BdbConfig dbConfig = StoredQueue.databaseConfig(); + db = env.openDatabase(null, "StoredMapTest", dbConfig.toDatabaseConfig()); + } catch (DatabaseException e) { + throw new RuntimeException(e); + } + this.pendingUris = new BdbMultipleWorkQueues(db, env.getClassCatalog()); + + } + + @AfterEach + protected void tearDown() throws Exception { + if(this.pendingUris!=null) + this.pendingUris.close(); + if (this.envDir.exists()) { + FileUtils.deleteDirectory(this.envDir); + } + } - /** * Basic sanity checks for calculateInsertKey() -- ensure ordinal, cost, * and schedulingDirective have the intended effects, for ordinal values @@ -53,38 +122,38 @@ public void testCalculateInsertKey() throws URIException { } for (long ordinalOrigin = 1; ordinalOrigin < Long.MAX_VALUE / 4; ordinalOrigin <<= 1) { - CrawlURI curi1 = + CrawlURI curi1 = new CrawlURI(UURIFactory.getInstance("http://archive.org/foo")); curi1.setOrdinal(ordinalOrigin); curi1.setClassKey("foo"); - byte[] key1 = + byte[] key1 = BdbMultipleWorkQueues.calculateInsertKey(curi1).getData(); - CrawlURI curi2 = + CrawlURI curi2 = new CrawlURI(UURIFactory.getInstance("http://archive.org/bar")); curi2.setOrdinal(ordinalOrigin + 1); curi2.setClassKey("foo"); - byte[] key2 = + byte[] key2 = BdbMultipleWorkQueues.calculateInsertKey(curi2).getData(); - CrawlURI curi3 = + CrawlURI curi3 = new CrawlURI(UURIFactory.getInstance("http://archive.org/baz")); curi3.setOrdinal(ordinalOrigin + 2); curi3.setClassKey("foo"); curi3.setSchedulingDirective(SchedulingConstants.HIGH); - byte[] key3 = + byte[] key3 = BdbMultipleWorkQueues.calculateInsertKey(curi3).getData(); - CrawlURI curi4 = + CrawlURI curi4 = new CrawlURI(UURIFactory.getInstance("http://archive.org/zle")); curi4.setOrdinal(ordinalOrigin + 3); curi4.setClassKey("foo"); curi4.setPrecedence(2); - byte[] key4 = + byte[] key4 = BdbMultipleWorkQueues.calculateInsertKey(curi4).getData(); - CrawlURI curi5 = + CrawlURI curi5 = new CrawlURI(UURIFactory.getInstance("http://archive.org/gru")); curi5.setOrdinal(ordinalOrigin + 4); curi5.setClassKey("foo"); curi5.setPrecedence(1); - byte[] key5 = + byte[] key5 = BdbMultipleWorkQueues.calculateInsertKey(curi5).getData(); // ensure that key1 (with lower ordinal) sorts before key2 (higher // ordinal) @@ -101,4 +170,42 @@ public void testCalculateInsertKey() throws URIException { "lower cost sorting first (" + ordinalOrigin + ")"); } } + + @Test + public void testThreadInterrupt() throws InterruptedException, IOException { + MockToeThread mockToeThread = new MockToeThread(this.pendingUris, makeCrawlURI("http://www.archive.org")); + + mockToeThread.start(); + + while (mockToeThread.isAlive()) { + Thread.sleep(100); + } + mockToeThread.join(); + assertNull(mockToeThread.thrownException); + + } + class MockToeThread extends Thread { + BdbMultipleWorkQueues pendingUris; + CrawlURI curi; + Exception thrownException; + public MockToeThread(BdbMultipleWorkQueues pendingUris, CrawlURI curi) { + this.pendingUris = pendingUris; + this.curi = curi; + this.thrownException = null; + } + @Override + public void run() { + this.pendingUris.put(this.curi, true); + + Thread.currentThread().interrupt(); + try { + this.pendingUris.put(this.curi, true); + } + catch (com.sleepycat.je.EnvironmentFailureException ex) { + this.thrownException = ex; + } + + } + } + }