Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public long deleteMatchingFromQueue(String match, String queue,
DatabaseEntry key = headKey;
DatabaseEntry value = new DatabaseEntry();
Cursor cursor = null;
Thread.interrupted();
try {
cursor = pendingUrisDB.openCursor(null, null);
OperationStatus result = cursor.getSearchKeyRange(headKey,
Expand Down Expand Up @@ -165,6 +166,7 @@ public CompositeData getFrom(

Cursor cursor = null;
OperationStatus result = null;
Thread.interrupted();
try {
cursor = pendingUrisDB.openCursor(null,null);
result = cursor.getSearchKey(key, value, null);
Expand Down Expand Up @@ -217,6 +219,7 @@ public CompositeData getFrom(
protected DatabaseEntry getFirstKey() throws DatabaseException {
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
Thread.interrupted();
Cursor cursor = pendingUrisDB.openCursor(null,null);
OperationStatus status = cursor.getNext(key,value,null);
cursor.close();
Expand Down Expand Up @@ -290,6 +293,7 @@ protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
DatabaseEntry result) throws DatabaseException {
Cursor cursor = null;
OperationStatus status;
Thread.interrupted();
try {
cursor = this.pendingUrisDB.openCursor(null, null);

Expand Down Expand Up @@ -337,6 +341,7 @@ public void put(CrawlURI curi, boolean overwriteIfPresent)
tallyAverageEntrySize(curi, value);
}
OperationStatus status;
Thread.interrupted();
if(overwriteIfPresent) {
status = pendingUrisDB.put(null, insertKey, value);
} else {
Expand Down Expand Up @@ -479,6 +484,7 @@ private static int findFirstZero(byte[] b) {
*/
public void delete(CrawlURI item) throws DatabaseException {
OperationStatus status;
Thread.interrupted();
DatabaseEntry de = (DatabaseEntry)item.getHolderKey();
status = pendingUrisDB.delete(null, de);
if (status != OperationStatus.SUCCESS) {
Expand All @@ -504,6 +510,7 @@ protected void sync() {
if (this.pendingUrisDB == null) {
return;
}
Thread.interrupted();
try {
this.pendingUrisDB.sync();
} catch (DatabaseException e) {
Expand Down Expand Up @@ -533,6 +540,7 @@ public void close() {
* @param origin key at which to insert the cap
*/
public void addCap(byte[] origin) {
Thread.interrupted();
try {
pendingUrisDB.put(null, new DatabaseEntry(origin),
new DatabaseEntry(new byte[0]));
Expand All @@ -549,6 +557,7 @@ public void addCap(byte[] origin) {
protected void forAllPendingDo(Closure c) throws DatabaseException {
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
Thread.interrupted();
Cursor cursor = pendingUrisDB.openCursor(null, null);
while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {
if (value.getData().length == 0) {
Expand All @@ -573,6 +582,7 @@ public long exportPendingUris(PrintWriter writer) {
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
long uris = 0L;
Thread.interrupted();
Cursor cursor = pendingUrisDB.openCursor(null, null);
while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {
if (value.getData().length == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/*
* This file is part of the Heritrix web crawler (crawler.archive.org).
*
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
* Licensed to the Internet Archive (IA) by one or more individual
* contributors.
*
* The IA licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
Expand All @@ -18,29 +18,98 @@
*/
package org.archive.crawler.frontier;

import java.util.logging.Logger;

import org.archive.url.URIException;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.tree.Key;
import org.apache.commons.io.FileUtils;
import org.archive.bdb.BdbModule;
import org.archive.bdb.StoredQueue;
import org.archive.modules.CrawlURI;
import org.archive.modules.SchedulingConstants;
import org.archive.net.UURI;
import org.archive.net.UURIFactory;

import com.sleepycat.je.tree.Key;

import org.archive.url.URIException;
import org.archive.util.Recorder;
import org.archive.util.bdbje.EnhancedEnvironment;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.logging.Logger;

import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Unit tests for BdbMultipleWorkQueues functionality.
*
* Unit tests for BdbMultipleWorkQueues functionality.
*
* @author gojomo
*/
public class BdbMultipleWorkQueuesTest {
private static Logger logger =
Logger.getLogger(BdbMultipleWorkQueuesTest.class.getName());
@TempDir
Path tempDir;
@TempDir
Path curiTempDir;

private BdbMultipleWorkQueues pendingUris = null;
private EnhancedEnvironment env;
private Database db;
private File envDir;

protected Recorder getRecorder() throws IOException {
if (Recorder.getHttpRecorder() == null) {
Recorder httpRecorder = new Recorder(curiTempDir.toFile(),
getClass().getName(), 16 * 1024, 512 * 1024);
Recorder.setHttpRecorder(httpRecorder);
}

return Recorder.getHttpRecorder();
}

protected CrawlURI makeCrawlURI(String uri) throws URIException,
IOException {
UURI uuri = UURIFactory.getInstance(uri);
CrawlURI curi = new CrawlURI(uuri);
curi.setClassKey("key");
curi.setSeed(true);
curi.setRecorder(getRecorder());
return curi;
}

@BeforeEach
protected void setUp() throws Exception {
this.envDir = new File(tempDir.toFile(),"BdbMultipleWorkQueuesTest");
org.archive.util.FileUtils.ensureWriteableDirectory(this.envDir);
try {
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setTransactional(false);
envConfig.setAllowCreate(true);
env = new EnhancedEnvironment(envDir,envConfig);
BdbModule.BdbConfig dbConfig = StoredQueue.databaseConfig();
db = env.openDatabase(null, "StoredMapTest", dbConfig.toDatabaseConfig());
} catch (DatabaseException e) {
throw new RuntimeException(e);
}
this.pendingUris = new BdbMultipleWorkQueues(db, env.getClassCatalog());

}

@AfterEach
protected void tearDown() throws Exception {
if(this.pendingUris!=null)
this.pendingUris.close();
if (this.envDir.exists()) {
FileUtils.deleteDirectory(this.envDir);
}
}


/**
* Basic sanity checks for calculateInsertKey() -- ensure ordinal, cost,
* and schedulingDirective have the intended effects, for ordinal values
Expand All @@ -53,38 +122,38 @@ public void testCalculateInsertKey() throws URIException {
}

for (long ordinalOrigin = 1; ordinalOrigin < Long.MAX_VALUE / 4; ordinalOrigin <<= 1) {
CrawlURI curi1 =
CrawlURI curi1 =
new CrawlURI(UURIFactory.getInstance("http://archive.org/foo"));
curi1.setOrdinal(ordinalOrigin);
curi1.setClassKey("foo");
byte[] key1 =
byte[] key1 =
BdbMultipleWorkQueues.calculateInsertKey(curi1).getData();
CrawlURI curi2 =
CrawlURI curi2 =
new CrawlURI(UURIFactory.getInstance("http://archive.org/bar"));
curi2.setOrdinal(ordinalOrigin + 1);
curi2.setClassKey("foo");
byte[] key2 =
byte[] key2 =
BdbMultipleWorkQueues.calculateInsertKey(curi2).getData();
CrawlURI curi3 =
CrawlURI curi3 =
new CrawlURI(UURIFactory.getInstance("http://archive.org/baz"));
curi3.setOrdinal(ordinalOrigin + 2);
curi3.setClassKey("foo");
curi3.setSchedulingDirective(SchedulingConstants.HIGH);
byte[] key3 =
byte[] key3 =
BdbMultipleWorkQueues.calculateInsertKey(curi3).getData();
CrawlURI curi4 =
CrawlURI curi4 =
new CrawlURI(UURIFactory.getInstance("http://archive.org/zle"));
curi4.setOrdinal(ordinalOrigin + 3);
curi4.setClassKey("foo");
curi4.setPrecedence(2);
byte[] key4 =
byte[] key4 =
BdbMultipleWorkQueues.calculateInsertKey(curi4).getData();
CrawlURI curi5 =
CrawlURI curi5 =
new CrawlURI(UURIFactory.getInstance("http://archive.org/gru"));
curi5.setOrdinal(ordinalOrigin + 4);
curi5.setClassKey("foo");
curi5.setPrecedence(1);
byte[] key5 =
byte[] key5 =
BdbMultipleWorkQueues.calculateInsertKey(curi5).getData();
// ensure that key1 (with lower ordinal) sorts before key2 (higher
// ordinal)
Expand All @@ -101,4 +170,42 @@ public void testCalculateInsertKey() throws URIException {
"lower cost sorting first (" + ordinalOrigin + ")");
}
}

@Test
public void testThreadInterrupt() throws InterruptedException, IOException {
MockToeThread mockToeThread = new MockToeThread(this.pendingUris, makeCrawlURI("http://www.archive.org"));

mockToeThread.start();

while (mockToeThread.isAlive()) {
Thread.sleep(100);
}
mockToeThread.join();
assertNull(mockToeThread.thrownException);

}
class MockToeThread extends Thread {
BdbMultipleWorkQueues pendingUris;
CrawlURI curi;
Exception thrownException;
public MockToeThread(BdbMultipleWorkQueues pendingUris, CrawlURI curi) {
this.pendingUris = pendingUris;
this.curi = curi;
this.thrownException = null;
}
@Override
public void run() {
this.pendingUris.put(this.curi, true);

Thread.currentThread().interrupt();
try {
this.pendingUris.put(this.curi, true);
}
catch (com.sleepycat.je.EnvironmentFailureException ex) {
this.thrownException = ex;
}

}
}

}
Loading