diff --git a/build.sbt b/build.sbt index 08f7910..4caf4e6 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ import sbt.Keys._ lazy val commonSettings = Seq( name := "archivespark", organization := "com.github.helgeho", - version := "3.0.1", + version := "3.0.3-MMISIEWICZ-SNAPSHOT", scalaVersion := "2.11.12", fork := true, exportJars := true @@ -16,9 +16,10 @@ lazy val archivespark = (project in file(".")) .settings( commonSettings, libraryDependencies ++= Seq( - "org.apache.hadoop" % "hadoop-client" % "2.5.0" % "provided", - "org.apache.spark" %% "spark-core" % "2.1.3" % "provided", - "org.apache.spark" %% "spark-sql" % "2.1.3" % "provided", + "org.apache.hadoop" % "hadoop-client" % "2.7.2" % "provided", + "org.apache.hadoop" % "hadoop-aws" % "2.7.2" % "provided", + "org.apache.spark" %% "spark-core" % "2.4.4" % "provided", + "org.apache.spark" %% "spark-sql" % "2.4.4" % "provided", "joda-time" % "joda-time" % "2.10", "org.apache.httpcomponents" % "httpclient" % "4.5.6", "org.netpreserve.commons" % "webarchive-commons" % "1.1.8" excludeAll( diff --git a/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala b/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala index 3d62420..dced4d2 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/http/HttpMessage.scala @@ -41,7 +41,11 @@ class HttpMessage (val statusLine: String, val headers: Map[String, String], val lazy val lowerCaseHeaders: Map[String, String] = headers.map{case (k,v) => (k.toLowerCase, v)} def contentEncoding: Option[String] = lowerCaseHeaders.get("content-encoding").map(_.toLowerCase) - def mime: Option[String] = lowerCaseHeaders.get("content-type").map(_.split(';').head.trim.toLowerCase) + def mime: Option[String] = Try { + lowerCaseHeaders.get("content-type") + .map(_.split(';').head.trim.toLowerCase) + }.getOrElse(None) + def charset: Option[String] = { lowerCaseHeaders.get("content-type").flatMap(_.split(';').drop(1).headOption).map(_.trim) .filter(_.startsWith("charset=")) diff --git a/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala b/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala index d3c7e06..b8c867e 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/io/ByteArray.scala @@ -29,7 +29,7 @@ import java.util.Collections import scala.collection.JavaConverters._ -class ByteArray { +class ByteArray extends Serializable { private val arrays = collection.mutable.Buffer.empty[Array[Byte]] def append(array: Array[Byte]): Unit = if (array.nonEmpty) arrays += array diff --git a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala index 62b352f..d668486 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsFileWriter.scala @@ -45,7 +45,8 @@ class HdfsFileWriter private(filename: String, append: Boolean, replication: Sho Log.info("Copying from temporary file " + file.getCanonicalPath + " to " + filename + "...") if (append) { val in = new FileInputStream(file) - val appendOut = HdfsIO.fs.append(new Path(filename)) + val p = new Path(filename) + val appendOut = HdfsIO.fs(p).append(p) IOUtil.copy(in, appendOut) appendOut.close() in.close() diff --git a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala index 807fb82..08ffd35 100644 --- a/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala +++ b/src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala @@ -44,7 +44,7 @@ object HdfsIO { val ReplicationProperty = "dfs.replication" val BufferSizeProperty = "io.file.buffer.size" - def fs: FileSystem = FileSystem.get(SparkHadoopUtil.get.conf) + def fs(p: Path): FileSystem = p.getFileSystem(SparkHadoopUtil.get.conf) object LoadingStrategy extends Enumeration { val Remote, BlockWise, CopyLocal, Dynamic = Value @@ -58,6 +58,7 @@ object HdfsIO { private var localFiles: Map[String, String] = Map.empty def open(path: String, offset: Long = 0, length: Long = 0, decompress: Boolean = true, retries: Int = 60, sleepMillis: Int = 1000 * 60, strategy: LoadingStrategy = defaultLoadingStrategy): InputStream = { + val p = new Path(path) val loadingStrategy = if (strategy == LoadingStrategy.Dynamic) { val fileSize = HdfsIO.length(path) val copyLocalThreshold = fileSize.toDouble * dynamicCopyLocalThreshold @@ -71,7 +72,7 @@ object HdfsIO { Common.retry(retries, sleepMillis, (retry, e) => { "File access failed (" + retry + "/" + retries + "): " + path + " (Offset: " + offset + ") - " + e.getMessage }) { retry => - val in = fs.open(new Path(path)) + val in = fs(p).open(p) if (retry > 0) in.seekToNewSource(offset) else if (offset > 0) in.seek(offset) val buffered = if (length > 0) new BufferedInputStream(new BoundedInputStream(in, length)) else new BufferedInputStream(in) @@ -81,14 +82,14 @@ object HdfsIO { } else buffered } case LoadingStrategy.BlockWise => - new BufferedInputStream(new HdfsBlockStream(fs, path, offset, length, retries, sleepMillis)) + new BufferedInputStream(new HdfsBlockStream(fs(p), path, offset, length, retries, sleepMillis)) case LoadingStrategy.CopyLocal => Common.retry(retries, sleepMillis, (retry, e) => { "File access failed (" + retry + "/" + retries + "): " + path + " - " + e.getMessage }) { retry => localFiles = localFiles.synchronized(localFiles.updated(path, { val tmpPath = IOUtil.tmpFile.getCanonicalPath - fs.copyToLocalFile(new Path(path), new Path(tmpPath)) + fs(p).copyToLocalFile(new Path(path), new Path(tmpPath)) tmpPath })) val in = new FileInputStream(localFiles(path)) @@ -113,14 +114,17 @@ object HdfsIO { def copyFromLocal(src: String, dst: String, move: Boolean = false, overwrite: Boolean = false, replication: Short = 0): Unit = { if (overwrite) delete(dst) val dstPath = new Path(dst) - val dstReplication = if (replication == 0) fs.getDefaultReplication(dstPath) else replication + val dstReplication = if (replication == 0) fs(dstPath).getDefaultReplication(dstPath) else replication val conf = new org.apache.hadoop.conf.Configuration(SparkHadoopUtil.get.conf) conf.setInt(ReplicationProperty, 1) - FileUtil.copy(FileSystem.getLocal(conf), new Path(src), fs, dstPath, move, overwrite, conf) - if (dstReplication > 1) fs.setReplication(dstPath, dstReplication) + FileUtil.copy(FileSystem.getLocal(conf), new Path(src), fs(dstPath), dstPath, move, overwrite, conf) + if (dstReplication > 1) fs(dstPath).setReplication(dstPath, dstReplication) } - def length(path: String): Long = HdfsIO.fs.getFileStatus(new Path(path)).getLen + def length(path: String): Long = { + val p = new Path(path) + HdfsIO.fs(p).getFileStatus(p).getLen + } def lines(path: String, n: Int = -1, offset: Long = 0): Seq[String] = access(path, offset, length = if (n < 0) -1 else 0) { in => val lines = IOUtil.lines(in) @@ -129,7 +133,8 @@ object HdfsIO { } def files(path: String, recursive: Boolean = true): Iterator[String] = { - val glob = fs.globStatus(new Path(path)) + val p = new Path(path) + val glob = fs(p).globStatus(p) if (glob == null) Iterator.empty else glob.toIterator.flatMap { status => if (status.isDirectory && recursive) files(new Path(status.getPath, "*").toString) @@ -139,7 +144,7 @@ object HdfsIO { def dir(path: String): String = { val p = new Path(path) - val status = fs.globStatus(p) + val status = fs(p).globStatus(p) if (status == null || status.isEmpty || (status.length == 1 && status.head.isDirectory)) path else p.getParent.toString } @@ -149,10 +154,10 @@ object HdfsIO { var tmpPath: Path = null while ({ tmpPath = new Path(path, prefix + rnd) - fs.exists(tmpPath) + fs(tmpPath).exists(tmpPath) }) rnd = System.currentTimeMillis + "-" + Random.nextInt.abs - fs.mkdirs(tmpPath) - if (deleteOnExit) fs.deleteOnExit(tmpPath) + fs(tmpPath).mkdirs(tmpPath) + if (deleteOnExit) fs(tmpPath).deleteOnExit(tmpPath) tmpPath.toString } @@ -165,15 +170,19 @@ object HdfsIO { def delete(path: String): Unit = if (exists(path)) { val p = new Path(path) - val success = fs.delete(p, true) - if (!success) fs.deleteOnExit(p) + val success = fs(p).delete(p, true) + if (!success) fs(p).deleteOnExit(p) } - def exists(path: String): Boolean = fs.exists(new Path(path)) + def exists(path: String): Boolean = { + val p = new Path(path) + fs(p).exists(p) + } def ensureOutDir(path: String, ensureNew: Boolean = true): Unit = { + val p = new Path(path) if (ensureNew && exists(path)) Common.printThrow("Path exists: " + path) - fs.mkdirs(new Path(path)) + fs(p).mkdirs(p) } def ensureNewFile(path: String): Unit = { @@ -182,12 +191,14 @@ object HdfsIO { def writer(path: String, overwrite: Boolean = false, append: Boolean = false, replication: Short = 0): HdfsFileWriter = HdfsFileWriter(path, overwrite, append, replication) - def bufferSize: Int = fs.getConf().getInt(BufferSizeProperty, 4096) + def bufferSize(p : Path): Int = { + fs(p).getConf.getInt(BufferSizeProperty, 4096) + } def out(path: String, overwrite: Boolean = false, compress: Boolean = true, useWriter: Boolean = true, append: Boolean = false, temporary: Boolean = false): OutputStream = { - val out = if (useWriter) writer(path, overwrite, append, if (temporary) tmpFileReplication else 0) else if (append) fs.append(new Path(path)) else { - val fsPath = new Path(path) - if (temporary) fs.create(fsPath, overwrite, bufferSize, tmpFileReplication, fs.getDefaultBlockSize(fsPath)) else fs.create(fsPath, overwrite) + val fsPath = new Path(path) + val out = if (useWriter) writer(path, overwrite, append, if (temporary) tmpFileReplication else 0) else if (append) fs(fsPath).append(fsPath) else { + if (temporary) fs(fsPath).create(fsPath, overwrite, bufferSize(fsPath), tmpFileReplication, fs(fsPath).getDefaultBlockSize(fsPath)) else fs(fsPath).create(fsPath, overwrite) } if (compress && path.toLowerCase.endsWith(GzipExt)) new GZIPOutputStream(out) else out diff --git a/src/main/scala/org/archive/archivespark/util/FilePathMap.scala b/src/main/scala/org/archive/archivespark/util/FilePathMap.scala index 71fa2ae..32d05f0 100644 --- a/src/main/scala/org/archive/archivespark/util/FilePathMap.scala +++ b/src/main/scala/org/archive/archivespark/util/FilePathMap.scala @@ -33,13 +33,14 @@ case class FilePathMap(path: String, patterns: Seq[String] = Seq.empty) { val pathMap: Map[String, String] = { var map = collection.mutable.Map[String, String]() - val fs = FileSystem.get(SparkHadoopUtil.get.conf) - val files = fs.listFiles(new Path(path), true) + val p = new Path(path) + val fs = p.getFileSystem(SparkHadoopUtil.get.conf) + val files = fs.listFiles(p, true) while (files.hasNext) { val path = files.next.getPath val filename = path.getName if (patterns.isEmpty || patterns.exists(filename.matches)) { - if (map.contains(filename)) throw new RuntimeException("duplicate filename: " + filename) + if (map.contains(filename)) throw new RuntimeException("duplicate filename: " + filename + " in path: " + path) map += filename -> path.getParent.toString.intern } }