diff --git a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java index 9bb3b6fda..5717e0ab0 100644 --- a/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java +++ b/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java @@ -14,24 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.nutch.indexer; import java.io.IOException; import java.lang.invoke.MethodHandles; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class IndexerOutputFormat - extends FileOutputFormat { +public class IndexerOutputFormat extends OutputFormat { - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); + private static final Logger LOG = + LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Override public RecordWriter getRecordWriter( @@ -40,32 +43,69 @@ public RecordWriter getRecordWriter( Configuration conf = context.getConfiguration(); final IndexWriters writers = IndexWriters.get(conf); - String name = getUniqueFile(context, "part", ""); - writers.open(conf, name); + // open writers (no temporary file output anymore) + String indexName = "index-" + context.getTaskAttemptID().toString(); + writers.open(conf, indexName); LOG.info(writers.describe()); return new RecordWriter() { @Override public void close(TaskAttemptContext context) throws IOException { + // do the commits once and for all the reducers in one go - boolean noCommit = conf - .getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false); + boolean noCommit = + conf.getBoolean(IndexerMapReduce.INDEXER_NO_COMMIT, false); + if (!noCommit) { writers.commit(); } + writers.close(); } @Override public void write(Text key, NutchIndexAction indexAction) throws IOException { + if (indexAction.action == NutchIndexAction.ADD) { writers.write(indexAction.doc); + } else if (indexAction.action == NutchIndexAction.DELETE) { writers.delete(key.toString()); } } }; } -} + + @Override + public void checkOutputSpecs(JobContext context) + throws IOException, InterruptedException { + // No output specs required since we don't write files + } + + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) + throws IOException, InterruptedException { + + return new OutputCommitter() { + + @Override + public void setupJob(JobContext jobContext) {} + + @Override + public void setupTask(TaskAttemptContext taskContext) {} + + @Override + public boolean needsTaskCommit(TaskAttemptContext taskContext) { + return false; + } + + @Override + public void commitTask(TaskAttemptContext taskContext) {} + + @Override + public void abortTask(TaskAttemptContext taskContext) {} + }; + } +} \ No newline at end of file