From 62edff7888c8b6922df2f5553b6a75620eb7f012 Mon Sep 17 00:00:00 2001 From: Michael Misiewicz Date: Fri, 13 Dec 2019 17:12:22 -0500 Subject: [PATCH] Small changes to support reading CommonCrawl files from S3 This change makes a few modifications to the HDFS utils. Importantly, the `FileSystem` objects from the hadoop libraries are retrieved from the URI of the files. This will allow accessing CommonCrawl WARC files on filesystems other than the currently configured one in the HadoopConf. Additionally there is a small fix for some sometimes corrupted WARC records encountered in the output from CommonCrawl. --- build.sbt | 9 ++-- .../sparkling/http/HttpMessage.scala | 6 ++- .../archivespark/sparkling/io/ByteArray.scala | 2 +- .../sparkling/io/HdfsFileWriter.scala | 3 +- .../archivespark/sparkling/io/HdfsIO.scala | 53 +++++++++++-------- .../archivespark/util/FilePathMap.scala | 7 +-- 6 files changed, 49 insertions(+), 31 deletions(-) diff --git a/build.sbt b/build.sbt index 08f7910..4caf4e6 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import sbt.Keys._ lazy val commonSettings = Seq( name := "archivespark", organization := "com.github.helgeho", - version := "3.0.1", + version := "3.0.3-MMISIEWICZ-SNAPSHOT", scalaVersion := "2.11.12", fork := true, exportJars := true @@ -16,9 +16,10 @@ lazy val archivespark = (project in file(".")) .settings( commonSettings, libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % "2.5.0" % "provided", - "org.apache.spark" %% "spark-core" % "2.1.3" % "provided", - "org.apache.spark" %% "spark-sql" % "2.1.3" % "provided", + "org.apache.hadoop" % "hadoop-client" % "2.7.2" % "provided", + "org.apache.hadoop" % "hadoop-aws" % "2.7.2" % "provided", + "org.apache.spark" %% "spark-core" % "2.4.4" % "provided", + "org.apache.spark" %% "spark-sql" % "2.4.4" % "provided", "joda-time" % "joda-time" % "2.10", "org.apache.httpcomponents" % "httpclient" % "4.5.6", "org.netpreserve.commons" % "webarchive-commons" % "1.1.8" excludeAll( diff --git a/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala b/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala index 3d62420..dced4d2 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala @@ -41,7 +41,11 @@ class HttpMessage (val statusLine: String, val headers: Map[String, String], val lazy val lowerCaseHeaders: Map[String, String] = headers.map{case (k,v) => (k.toLowerCase, v)} def contentEncoding: Option[String] = lowerCaseHeaders.get("content-encoding").map(_.toLowerCase) - def mime: Option[String] = lowerCaseHeaders.get("content-type").map(_.split(';').head.trim.toLowerCase) + def mime: Option[String] = Try { + lowerCaseHeaders.get("content-type") + .map(_.split(';').head.trim.toLowerCase) + }.getOrElse(None) + def charset: Option[String] = { lowerCaseHeaders.get("content-type").flatMap(_.split(';').drop(1).headOption).map(_.trim) .filter(_.startsWith("charset=")) diff --git a/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala b/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala index d3c7e06..b8c867e 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala @@ -29,7 +29,7 @@ import java.util.Collections import scala.collection.JavaConverters._ -class ByteArray { +class ByteArray extends Serializable { private val arrays = collection.mutable.Buffer.empty[Array[Byte]] def append(array: Array[Byte]): Unit = if (array.nonEmpty) arrays += array diff --git a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala index 62b352f..d668486 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala @@ -45,7 +45,8 @@ class HdfsFileWriter private(filename: String, append: Boolean, replication: Sho Log.info("Copying from temporary file " + file.getCanonicalPath + " to " + filename + "...") if (append) { val in = new FileInputStream(file) - val appendOut = HdfsIO.fs.append(new Path(filename)) + val p = new Path(filename) + val appendOut = HdfsIO.fs(p).append(p) IOUtil.copy(in, appendOut) appendOut.close() in.close() diff --git a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala index 807fb82..08ffd35 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala @@ -44,7 +44,7 @@ object HdfsIO { val ReplicationProperty = "dfs.replication" val BufferSizeProperty = "io.file.buffer.size" - def fs: FileSystem = FileSystem.get(SparkHadoopUtil.get.conf) + def fs(p: Path): FileSystem = p.getFileSystem(SparkHadoopUtil.get.conf) object LoadingStrategy extends Enumeration { val Remote, BlockWise, CopyLocal, Dynamic = Value @@ -58,6 +58,7 @@ object HdfsIO { private var localFiles: Map[String, String] = Map.empty def open(path: String, offset: Long = 0, length: Long = 0, decompress: Boolean = true, retries: Int = 60, sleepMillis: Int = 1000 * 60, strategy: LoadingStrategy = defaultLoadingStrategy): InputStream = { + val p = new Path(path) val loadingStrategy = if (strategy == LoadingStrategy.Dynamic) { val fileSize = HdfsIO.length(path) val copyLocalThreshold = fileSize.toDouble * dynamicCopyLocalThreshold @@ -71,7 +72,7 @@ object HdfsIO { Common.retry(retries, sleepMillis, (retry, e) => { "File access failed (" + retry + "/" + retries + "): " + path + " (Offset: " + offset + ") - " + e.getMessage }) { retry => - val in = fs.open(new Path(path)) + val in = fs(p).open(p) if (retry > 0) in.seekToNewSource(offset) else if (offset > 0) in.seek(offset) val buffered = if (length > 0) new BufferedInputStream(new BoundedInputStream(in, length)) else new BufferedInputStream(in) @@ -81,14 +82,14 @@ object HdfsIO { } else buffered } case LoadingStrategy.BlockWise => - new BufferedInputStream(new HdfsBlockStream(fs, path, offset, length, retries, sleepMillis)) + new BufferedInputStream(new HdfsBlockStream(fs(p), path, offset, length, retries, sleepMillis)) case LoadingStrategy.CopyLocal => Common.retry(retries, sleepMillis, (retry, e) => { "File access failed (" + retry + "/" + retries + "): " + path + " - " + e.getMessage }) { retry => localFiles = localFiles.synchronized(localFiles.updated(path, { val tmpPath = IOUtil.tmpFile.getCanonicalPath - fs.copyToLocalFile(new Path(path), new Path(tmpPath)) + fs(p).copyToLocalFile(new Path(path), new Path(tmpPath)) tmpPath })) val in = new FileInputStream(localFiles(path)) @@ -113,14 +114,17 @@ object HdfsIO { def copyFromLocal(src: String, dst: String, move: Boolean = false, overwrite: Boolean = false, replication: Short = 0): Unit = { if (overwrite) delete(dst) val dstPath = new Path(dst) - val dstReplication = if (replication == 0) fs.getDefaultReplication(dstPath) else replication + val dstReplication = if (replication == 0) fs(dstPath).getDefaultReplication(dstPath) else replication val conf = new org.apache.hadoop.conf.Configuration(SparkHadoopUtil.get.conf) conf.setInt(ReplicationProperty, 1) - FileUtil.copy(FileSystem.getLocal(conf), new Path(src), fs, dstPath, move, overwrite, conf) - if (dstReplication > 1) fs.setReplication(dstPath, dstReplication) + FileUtil.copy(FileSystem.getLocal(conf), new Path(src), fs(dstPath), dstPath, move, overwrite, conf) + if (dstReplication > 1) fs(dstPath).setReplication(dstPath, dstReplication) } - def length(path: String): Long = HdfsIO.fs.getFileStatus(new Path(path)).getLen + def length(path: String): Long = { + val p = new Path(path) + HdfsIO.fs(p).getFileStatus(p).getLen + } def lines(path: String, n: Int = -1, offset: Long = 0): Seq[String] = access(path, offset, length = if (n < 0) -1 else 0) { in => val lines = IOUtil.lines(in) @@ -129,7 +133,8 @@ object HdfsIO { } def files(path: String, recursive: Boolean = true): Iterator[String] = { - val glob = fs.globStatus(new Path(path)) + val p = new Path(path) + val glob = fs(p).globStatus(p) if (glob == null) Iterator.empty else glob.toIterator.flatMap { status => if (status.isDirectory && recursive) files(new Path(status.getPath, "*").toString) @@ -139,7 +144,7 @@ object HdfsIO { def dir(path: String): String = { val p = new Path(path) - val status = fs.globStatus(p) + val status = fs(p).globStatus(p) if (status == null || status.isEmpty || (status.length == 1 && status.head.isDirectory)) path else p.getParent.toString } @@ -149,10 +154,10 @@ object HdfsIO { var tmpPath: Path = null while ({ tmpPath = new Path(path, prefix + rnd) - fs.exists(tmpPath) + fs(tmpPath).exists(tmpPath) }) rnd = System.currentTimeMillis + "-" + Random.nextInt.abs - fs.mkdirs(tmpPath) - if (deleteOnExit) fs.deleteOnExit(tmpPath) + fs(tmpPath).mkdirs(tmpPath) + if (deleteOnExit) fs(tmpPath).deleteOnExit(tmpPath) tmpPath.toString } @@ -165,15 +170,19 @@ object HdfsIO { def delete(path: String): Unit = if (exists(path)) { val p = new Path(path) - val success = fs.delete(p, true) - if (!success) fs.deleteOnExit(p) + val success = fs(p).delete(p, true) + if (!success) fs(p).deleteOnExit(p) } - def exists(path: String): Boolean = fs.exists(new Path(path)) + def exists(path: String): Boolean = { + val p = new Path(path) + fs(p).exists(p) + } def ensureOutDir(path: String, ensureNew: Boolean = true): Unit = { + val p = new Path(path) if (ensureNew && exists(path)) Common.printThrow("Path exists: " + path) - fs.mkdirs(new Path(path)) + fs(p).mkdirs(p) } def ensureNewFile(path: String): Unit = { @@ -182,12 +191,14 @@ object HdfsIO { def writer(path: String, overwrite: Boolean = false, append: Boolean = false, replication: Short = 0): HdfsFileWriter = HdfsFileWriter(path, overwrite, append, replication) - def bufferSize: Int = fs.getConf().getInt(BufferSizeProperty, 4096) + def bufferSize(p : Path): Int = { + fs(p).getConf.getInt(BufferSizeProperty, 4096) + } def out(path: String, overwrite: Boolean = false, compress: Boolean = true, useWriter: Boolean = true, append: Boolean = false, temporary: Boolean = false): OutputStream = { - val out = if (useWriter) writer(path, overwrite, append, if (temporary) tmpFileReplication else 0) else if (append) fs.append(new Path(path)) else { - val fsPath = new Path(path) - if (temporary) fs.create(fsPath, overwrite, bufferSize, tmpFileReplication, fs.getDefaultBlockSize(fsPath)) else fs.create(fsPath, overwrite) + val fsPath = new Path(path) + val out = if (useWriter) writer(path, overwrite, append, if (temporary) tmpFileReplication else 0) else if (append) fs(fsPath).append(fsPath) else { + if (temporary) fs(fsPath).create(fsPath, overwrite, bufferSize(fsPath), tmpFileReplication, fs(fsPath).getDefaultBlockSize(fsPath)) else fs(fsPath).create(fsPath, overwrite) } if (compress && path.toLowerCase.endsWith(GzipExt)) new GZIPOutputStream(out) else out diff --git a/src/main/scala/org/archive/archivespark/util/FilePathMap.scala b/src/main/scala/org/archive/archivespark/util/FilePathMap.scala index 71fa2ae..32d05f0 100644 --- a/src/main/scala/org/archive/archivespark/util/FilePathMap.scala +++ b/src/main/scala/org/archive/archivespark/util/FilePathMap.scala @@ -33,13 +33,14 @@ case class FilePathMap(path: String, patterns: Seq[String] = Seq.empty) { val pathMap: Map[String, String] = { var map = collection.mutable.Map[String, String]() - val fs = FileSystem.get(SparkHadoopUtil.get.conf) - val files = fs.listFiles(new Path(path), true) + val p = new Path(path) + val fs = p.getFileSystem(SparkHadoopUtil.get.conf) + val files = fs.listFiles(p, true) while (files.hasNext) { val path = files.next.getPath val filename = path.getName if (patterns.isEmpty || patterns.exists(filename.matches)) { - if (map.contains(filename)) throw new RuntimeException("duplicate filename: " + filename) + if (map.contains(filename)) throw new RuntimeException("duplicate filename: " + filename + " in path: " + path) map += filename -> path.getParent.toString.intern } }