Skip to content

Commit 3a27aeb

Browse files
authored
Merge pull request #671 from internetarchive/adam/handle_more_bdb_shutdown_interrupts
fix: handle more bdb shutdown interrupts
2 parents dd11f87 + 558727a commit 3a27aeb

File tree

2 files changed

+138
-21
lines changed

2 files changed

+138
-21
lines changed

engine/src/main/java/org/archive/crawler/frontier/BdbMultipleWorkQueues.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public long deleteMatchingFromQueue(String match, String queue,
109109
DatabaseEntry key = headKey;
110110
DatabaseEntry value = new DatabaseEntry();
111111
Cursor cursor = null;
112+
Thread.interrupted();
112113
try {
113114
cursor = pendingUrisDB.openCursor(null, null);
114115
OperationStatus result = cursor.getSearchKeyRange(headKey,
@@ -165,6 +166,7 @@ public CompositeData getFrom(
165166

166167
Cursor cursor = null;
167168
OperationStatus result = null;
169+
Thread.interrupted();
168170
try {
169171
cursor = pendingUrisDB.openCursor(null,null);
170172
result = cursor.getSearchKey(key, value, null);
@@ -217,6 +219,7 @@ public CompositeData getFrom(
217219
protected DatabaseEntry getFirstKey() throws DatabaseException {
218220
DatabaseEntry key = new DatabaseEntry();
219221
DatabaseEntry value = new DatabaseEntry();
222+
Thread.interrupted();
220223
Cursor cursor = pendingUrisDB.openCursor(null,null);
221224
OperationStatus status = cursor.getNext(key,value,null);
222225
cursor.close();
@@ -290,6 +293,7 @@ protected OperationStatus getNextNearestItem(DatabaseEntry headKey,
290293
DatabaseEntry result) throws DatabaseException {
291294
Cursor cursor = null;
292295
OperationStatus status;
296+
Thread.interrupted();
293297
try {
294298
cursor = this.pendingUrisDB.openCursor(null, null);
295299

@@ -337,6 +341,7 @@ public void put(CrawlURI curi, boolean overwriteIfPresent)
337341
tallyAverageEntrySize(curi, value);
338342
}
339343
OperationStatus status;
344+
Thread.interrupted();
340345
if(overwriteIfPresent) {
341346
status = pendingUrisDB.put(null, insertKey, value);
342347
} else {
@@ -479,6 +484,7 @@ private static int findFirstZero(byte[] b) {
479484
*/
480485
public void delete(CrawlURI item) throws DatabaseException {
481486
OperationStatus status;
487+
Thread.interrupted();
482488
DatabaseEntry de = (DatabaseEntry)item.getHolderKey();
483489
status = pendingUrisDB.delete(null, de);
484490
if (status != OperationStatus.SUCCESS) {
@@ -504,6 +510,7 @@ protected void sync() {
504510
if (this.pendingUrisDB == null) {
505511
return;
506512
}
513+
Thread.interrupted();
507514
try {
508515
this.pendingUrisDB.sync();
509516
} catch (DatabaseException e) {
@@ -533,6 +540,7 @@ public void close() {
533540
* @param origin key at which to insert the cap
534541
*/
535542
public void addCap(byte[] origin) {
543+
Thread.interrupted();
536544
try {
537545
pendingUrisDB.put(null, new DatabaseEntry(origin),
538546
new DatabaseEntry(new byte[0]));
@@ -549,6 +557,7 @@ public void addCap(byte[] origin) {
549557
protected void forAllPendingDo(Closure c) throws DatabaseException {
550558
DatabaseEntry key = new DatabaseEntry();
551559
DatabaseEntry value = new DatabaseEntry();
560+
Thread.interrupted();
552561
Cursor cursor = pendingUrisDB.openCursor(null, null);
553562
while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {
554563
if (value.getData().length == 0) {
@@ -573,6 +582,7 @@ public long exportPendingUris(PrintWriter writer) {
573582
DatabaseEntry key = new DatabaseEntry();
574583
DatabaseEntry value = new DatabaseEntry();
575584
long uris = 0L;
585+
Thread.interrupted();
576586
Cursor cursor = pendingUrisDB.openCursor(null, null);
577587
while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {
578588
if (value.getData().length == 0) {

engine/src/test/java/org/archive/crawler/frontier/BdbMultipleWorkQueuesTest.java

Lines changed: 128 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/*
22
* This file is part of the Heritrix web crawler (crawler.archive.org).
33
*
4-
* Licensed to the Internet Archive (IA) by one or more individual
5-
* contributors.
4+
* Licensed to the Internet Archive (IA) by one or more individual
5+
* contributors.
66
*
77
* The IA licenses this file to You under the Apache License, Version 2.0
88
* (the "License"); you may not use this file except in compliance with
@@ -18,29 +18,98 @@
1818
*/
1919
package org.archive.crawler.frontier;
2020

21-
import java.util.logging.Logger;
22-
23-
import org.archive.url.URIException;
21+
import com.sleepycat.je.Database;
22+
import com.sleepycat.je.DatabaseException;
23+
import com.sleepycat.je.EnvironmentConfig;
24+
import com.sleepycat.je.tree.Key;
25+
import org.apache.commons.io.FileUtils;
26+
import org.archive.bdb.BdbModule;
27+
import org.archive.bdb.StoredQueue;
2428
import org.archive.modules.CrawlURI;
2529
import org.archive.modules.SchedulingConstants;
30+
import org.archive.net.UURI;
2631
import org.archive.net.UURIFactory;
27-
28-
import com.sleepycat.je.tree.Key;
29-
32+
import org.archive.url.URIException;
33+
import org.archive.util.Recorder;
34+
import org.archive.util.bdbje.EnhancedEnvironment;
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
3037
import org.junit.jupiter.api.Test;
38+
import org.junit.jupiter.api.io.TempDir;
39+
40+
import java.io.File;
41+
import java.io.IOException;
42+
import java.nio.file.Path;
43+
import java.util.logging.Logger;
3144

45+
import static org.junit.jupiter.api.Assertions.assertNull;
3246
import static org.junit.jupiter.api.Assertions.assertTrue;
3347

3448
/**
35-
* Unit tests for BdbMultipleWorkQueues functionality.
36-
*
49+
* Unit tests for BdbMultipleWorkQueues functionality.
50+
*
3751
* @author gojomo
3852
*/
3953
public class BdbMultipleWorkQueuesTest {
4054
private static Logger logger =
4155
Logger.getLogger(BdbMultipleWorkQueuesTest.class.getName());
56+
@TempDir
57+
Path tempDir;
58+
@TempDir
59+
Path curiTempDir;
60+
61+
private BdbMultipleWorkQueues pendingUris = null;
62+
private EnhancedEnvironment env;
63+
private Database db;
64+
private File envDir;
65+
66+
protected Recorder getRecorder() throws IOException {
67+
if (Recorder.getHttpRecorder() == null) {
68+
Recorder httpRecorder = new Recorder(curiTempDir.toFile(),
69+
getClass().getName(), 16 * 1024, 512 * 1024);
70+
Recorder.setHttpRecorder(httpRecorder);
71+
}
72+
73+
return Recorder.getHttpRecorder();
74+
}
75+
76+
protected CrawlURI makeCrawlURI(String uri) throws URIException,
77+
IOException {
78+
UURI uuri = UURIFactory.getInstance(uri);
79+
CrawlURI curi = new CrawlURI(uuri);
80+
curi.setClassKey("key");
81+
curi.setSeed(true);
82+
curi.setRecorder(getRecorder());
83+
return curi;
84+
}
85+
86+
@BeforeEach
87+
protected void setUp() throws Exception {
88+
this.envDir = new File(tempDir.toFile(),"BdbMultipleWorkQueuesTest");
89+
org.archive.util.FileUtils.ensureWriteableDirectory(this.envDir);
90+
try {
91+
EnvironmentConfig envConfig = new EnvironmentConfig();
92+
envConfig.setTransactional(false);
93+
envConfig.setAllowCreate(true);
94+
env = new EnhancedEnvironment(envDir,envConfig);
95+
BdbModule.BdbConfig dbConfig = StoredQueue.databaseConfig();
96+
db = env.openDatabase(null, "StoredMapTest", dbConfig.toDatabaseConfig());
97+
} catch (DatabaseException e) {
98+
throw new RuntimeException(e);
99+
}
100+
this.pendingUris = new BdbMultipleWorkQueues(db, env.getClassCatalog());
101+
102+
}
103+
104+
@AfterEach
105+
protected void tearDown() throws Exception {
106+
if(this.pendingUris!=null)
107+
this.pendingUris.close();
108+
if (this.envDir.exists()) {
109+
FileUtils.deleteDirectory(this.envDir);
110+
}
111+
}
42112

43-
44113
/**
45114
* Basic sanity checks for calculateInsertKey() -- ensure ordinal, cost,
46115
* and schedulingDirective have the intended effects, for ordinal values
@@ -53,38 +122,38 @@ public void testCalculateInsertKey() throws URIException {
53122
}
54123

55124
for (long ordinalOrigin = 1; ordinalOrigin < Long.MAX_VALUE / 4; ordinalOrigin <<= 1) {
56-
CrawlURI curi1 =
125+
CrawlURI curi1 =
57126
new CrawlURI(UURIFactory.getInstance("http://archive.org/foo"));
58127
curi1.setOrdinal(ordinalOrigin);
59128
curi1.setClassKey("foo");
60-
byte[] key1 =
129+
byte[] key1 =
61130
BdbMultipleWorkQueues.calculateInsertKey(curi1).getData();
62-
CrawlURI curi2 =
131+
CrawlURI curi2 =
63132
new CrawlURI(UURIFactory.getInstance("http://archive.org/bar"));
64133
curi2.setOrdinal(ordinalOrigin + 1);
65134
curi2.setClassKey("foo");
66-
byte[] key2 =
135+
byte[] key2 =
67136
BdbMultipleWorkQueues.calculateInsertKey(curi2).getData();
68-
CrawlURI curi3 =
137+
CrawlURI curi3 =
69138
new CrawlURI(UURIFactory.getInstance("http://archive.org/baz"));
70139
curi3.setOrdinal(ordinalOrigin + 2);
71140
curi3.setClassKey("foo");
72141
curi3.setSchedulingDirective(SchedulingConstants.HIGH);
73-
byte[] key3 =
142+
byte[] key3 =
74143
BdbMultipleWorkQueues.calculateInsertKey(curi3).getData();
75-
CrawlURI curi4 =
144+
CrawlURI curi4 =
76145
new CrawlURI(UURIFactory.getInstance("http://archive.org/zle"));
77146
curi4.setOrdinal(ordinalOrigin + 3);
78147
curi4.setClassKey("foo");
79148
curi4.setPrecedence(2);
80-
byte[] key4 =
149+
byte[] key4 =
81150
BdbMultipleWorkQueues.calculateInsertKey(curi4).getData();
82-
CrawlURI curi5 =
151+
CrawlURI curi5 =
83152
new CrawlURI(UURIFactory.getInstance("http://archive.org/gru"));
84153
curi5.setOrdinal(ordinalOrigin + 4);
85154
curi5.setClassKey("foo");
86155
curi5.setPrecedence(1);
87-
byte[] key5 =
156+
byte[] key5 =
88157
BdbMultipleWorkQueues.calculateInsertKey(curi5).getData();
89158
// ensure that key1 (with lower ordinal) sorts before key2 (higher
90159
// ordinal)
@@ -101,4 +170,42 @@ public void testCalculateInsertKey() throws URIException {
101170
"lower cost sorting first (" + ordinalOrigin + ")");
102171
}
103172
}
173+
174+
@Test
175+
public void testThreadInterrupt() throws InterruptedException, IOException {
176+
MockToeThread mockToeThread = new MockToeThread(this.pendingUris, makeCrawlURI("http://www.archive.org"));
177+
178+
mockToeThread.start();
179+
180+
while (mockToeThread.isAlive()) {
181+
Thread.sleep(100);
182+
}
183+
mockToeThread.join();
184+
assertNull(mockToeThread.thrownException);
185+
186+
}
187+
class MockToeThread extends Thread {
188+
BdbMultipleWorkQueues pendingUris;
189+
CrawlURI curi;
190+
Exception thrownException;
191+
public MockToeThread(BdbMultipleWorkQueues pendingUris, CrawlURI curi) {
192+
this.pendingUris = pendingUris;
193+
this.curi = curi;
194+
this.thrownException = null;
195+
}
196+
@Override
197+
public void run() {
198+
this.pendingUris.put(this.curi, true);
199+
200+
Thread.currentThread().interrupt();
201+
try {
202+
this.pendingUris.put(this.curi, true);
203+
}
204+
catch (com.sleepycat.je.EnvironmentFailureException ex) {
205+
this.thrownException = ex;
206+
}
207+
208+
}
209+
}
210+
104211
}

0 commit comments

Comments
 (0)