Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import sbt.Keys._
lazy val commonSettings = Seq(
name := "archivespark",
organization := "com.github.helgeho",
version := "3.0.1",
version := "3.0.3-MMISIEWICZ-SNAPSHOT",
scalaVersion := "2.11.12",
fork := true,
exportJars := true
Expand All @@ -16,9 +16,10 @@ lazy val archivespark = (project in file("."))
.settings(
commonSettings,
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.5.0" % "provided",
"org.apache.spark" %% "spark-core" % "2.1.3" % "provided",
"org.apache.spark" %% "spark-sql" % "2.1.3" % "provided",
"org.apache.hadoop" % "hadoop-client" % "2.7.2" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "2.7.2" % "provided",
"org.apache.spark" %% "spark-core" % "2.4.4" % "provided",
"org.apache.spark" %% "spark-sql" % "2.4.4" % "provided",
"joda-time" % "joda-time" % "2.10",
"org.apache.httpcomponents" % "httpclient" % "4.5.6",
"org.netpreserve.commons" % "webarchive-commons" % "1.1.8" excludeAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ class HttpMessage (val statusLine: String, val headers: Map[String, String], val
lazy val lowerCaseHeaders: Map[String, String] = headers.map{case (k,v) => (k.toLowerCase, v)}

def contentEncoding: Option[String] = lowerCaseHeaders.get("content-encoding").map(_.toLowerCase)
def mime: Option[String] = lowerCaseHeaders.get("content-type").map(_.split(';').head.trim.toLowerCase)
def mime: Option[String] = Try {
lowerCaseHeaders.get("content-type")
.map(_.split(';').head.trim.toLowerCase)
}.getOrElse(None)

def charset: Option[String] = {
lowerCaseHeaders.get("content-type").flatMap(_.split(';').drop(1).headOption).map(_.trim)
.filter(_.startsWith("charset="))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.util.Collections

import scala.collection.JavaConverters._

class ByteArray {
class ByteArray extends Serializable {
private val arrays = collection.mutable.Buffer.empty[Array[Byte]]

def append(array: Array[Byte]): Unit = if (array.nonEmpty) arrays += array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class HdfsFileWriter private(filename: String, append: Boolean, replication: Sho
Log.info("Copying from temporary file " + file.getCanonicalPath + " to " + filename + "...")
if (append) {
val in = new FileInputStream(file)
val appendOut = HdfsIO.fs.append(new Path(filename))
val p = new Path(filename)
val appendOut = HdfsIO.fs(p).append(p)
IOUtil.copy(in, appendOut)
appendOut.close()
in.close()
Expand Down
53 changes: 32 additions & 21 deletions src/main/scala/org/archive/archivespark/sparkling/io/HdfsIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object HdfsIO {
val ReplicationProperty = "dfs.replication"
val BufferSizeProperty = "io.file.buffer.size"

def fs: FileSystem = FileSystem.get(SparkHadoopUtil.get.conf)
def fs(p: Path): FileSystem = p.getFileSystem(SparkHadoopUtil.get.conf)

object LoadingStrategy extends Enumeration {
val Remote, BlockWise, CopyLocal, Dynamic = Value
Expand All @@ -58,6 +58,7 @@ object HdfsIO {
private var localFiles: Map[String, String] = Map.empty

def open(path: String, offset: Long = 0, length: Long = 0, decompress: Boolean = true, retries: Int = 60, sleepMillis: Int = 1000 * 60, strategy: LoadingStrategy = defaultLoadingStrategy): InputStream = {
val p = new Path(path)
val loadingStrategy = if (strategy == LoadingStrategy.Dynamic) {
val fileSize = HdfsIO.length(path)
val copyLocalThreshold = fileSize.toDouble * dynamicCopyLocalThreshold
Expand All @@ -71,7 +72,7 @@ object HdfsIO {
Common.retry(retries, sleepMillis, (retry, e) => {
"File access failed (" + retry + "/" + retries + "): " + path + " (Offset: " + offset + ") - " + e.getMessage
}) { retry =>
val in = fs.open(new Path(path))
val in = fs(p).open(p)
if (retry > 0) in.seekToNewSource(offset)
else if (offset > 0) in.seek(offset)
val buffered = if (length > 0) new BufferedInputStream(new BoundedInputStream(in, length)) else new BufferedInputStream(in)
Expand All @@ -81,14 +82,14 @@ object HdfsIO {
} else buffered
}
case LoadingStrategy.BlockWise =>
new BufferedInputStream(new HdfsBlockStream(fs, path, offset, length, retries, sleepMillis))
new BufferedInputStream(new HdfsBlockStream(fs(p), path, offset, length, retries, sleepMillis))
case LoadingStrategy.CopyLocal =>
Common.retry(retries, sleepMillis, (retry, e) => {
"File access failed (" + retry + "/" + retries + "): " + path + " - " + e.getMessage
}) { retry =>
localFiles = localFiles.synchronized(localFiles.updated(path, {
val tmpPath = IOUtil.tmpFile.getCanonicalPath
fs.copyToLocalFile(new Path(path), new Path(tmpPath))
fs(p).copyToLocalFile(new Path(path), new Path(tmpPath))
tmpPath
}))
val in = new FileInputStream(localFiles(path))
Expand All @@ -113,14 +114,17 @@ object HdfsIO {
def copyFromLocal(src: String, dst: String, move: Boolean = false, overwrite: Boolean = false, replication: Short = 0): Unit = {
if (overwrite) delete(dst)
val dstPath = new Path(dst)
val dstReplication = if (replication == 0) fs.getDefaultReplication(dstPath) else replication
val dstReplication = if (replication == 0) fs(dstPath).getDefaultReplication(dstPath) else replication
val conf = new org.apache.hadoop.conf.Configuration(SparkHadoopUtil.get.conf)
conf.setInt(ReplicationProperty, 1)
FileUtil.copy(FileSystem.getLocal(conf), new Path(src), fs, dstPath, move, overwrite, conf)
if (dstReplication > 1) fs.setReplication(dstPath, dstReplication)
FileUtil.copy(FileSystem.getLocal(conf), new Path(src), fs(dstPath), dstPath, move, overwrite, conf)
if (dstReplication > 1) fs(dstPath).setReplication(dstPath, dstReplication)
}

def length(path: String): Long = HdfsIO.fs.getFileStatus(new Path(path)).getLen
def length(path: String): Long = {
val p = new Path(path)
HdfsIO.fs(p).getFileStatus(p).getLen
}

def lines(path: String, n: Int = -1, offset: Long = 0): Seq[String] = access(path, offset, length = if (n < 0) -1 else 0) { in =>
val lines = IOUtil.lines(in)
Expand All @@ -129,7 +133,8 @@ object HdfsIO {
}

def files(path: String, recursive: Boolean = true): Iterator[String] = {
val glob = fs.globStatus(new Path(path))
val p = new Path(path)
val glob = fs(p).globStatus(p)
if (glob == null) Iterator.empty
else glob.toIterator.flatMap { status =>
if (status.isDirectory && recursive) files(new Path(status.getPath, "*").toString)
Expand All @@ -139,7 +144,7 @@ object HdfsIO {

def dir(path: String): String = {
val p = new Path(path)
val status = fs.globStatus(p)
val status = fs(p).globStatus(p)
if (status == null || status.isEmpty || (status.length == 1 && status.head.isDirectory)) path
else p.getParent.toString
}
Expand All @@ -149,10 +154,10 @@ object HdfsIO {
var tmpPath: Path = null
while ({
tmpPath = new Path(path, prefix + rnd)
fs.exists(tmpPath)
fs(tmpPath).exists(tmpPath)
}) rnd = System.currentTimeMillis + "-" + Random.nextInt.abs
fs.mkdirs(tmpPath)
if (deleteOnExit) fs.deleteOnExit(tmpPath)
fs(tmpPath).mkdirs(tmpPath)
if (deleteOnExit) fs(tmpPath).deleteOnExit(tmpPath)
tmpPath.toString
}

Expand All @@ -165,15 +170,19 @@ object HdfsIO {

def delete(path: String): Unit = if (exists(path)) {
val p = new Path(path)
val success = fs.delete(p, true)
if (!success) fs.deleteOnExit(p)
val success = fs(p).delete(p, true)
if (!success) fs(p).deleteOnExit(p)
}

def exists(path: String): Boolean = fs.exists(new Path(path))
def exists(path: String): Boolean = {
val p = new Path(path)
fs(p).exists(p)
}

def ensureOutDir(path: String, ensureNew: Boolean = true): Unit = {
val p = new Path(path)
if (ensureNew && exists(path)) Common.printThrow("Path exists: " + path)
fs.mkdirs(new Path(path))
fs(p).mkdirs(p)
}

def ensureNewFile(path: String): Unit = {
Expand All @@ -182,12 +191,14 @@ object HdfsIO {

def writer(path: String, overwrite: Boolean = false, append: Boolean = false, replication: Short = 0): HdfsFileWriter = HdfsFileWriter(path, overwrite, append, replication)

def bufferSize: Int = fs.getConf().getInt(BufferSizeProperty, 4096)
def bufferSize(p : Path): Int = {
fs(p).getConf.getInt(BufferSizeProperty, 4096)
}

def out(path: String, overwrite: Boolean = false, compress: Boolean = true, useWriter: Boolean = true, append: Boolean = false, temporary: Boolean = false): OutputStream = {
val out = if (useWriter) writer(path, overwrite, append, if (temporary) tmpFileReplication else 0) else if (append) fs.append(new Path(path)) else {
val fsPath = new Path(path)
if (temporary) fs.create(fsPath, overwrite, bufferSize, tmpFileReplication, fs.getDefaultBlockSize(fsPath)) else fs.create(fsPath, overwrite)
val fsPath = new Path(path)
val out = if (useWriter) writer(path, overwrite, append, if (temporary) tmpFileReplication else 0) else if (append) fs(fsPath).append(fsPath) else {
if (temporary) fs(fsPath).create(fsPath, overwrite, bufferSize(fsPath), tmpFileReplication, fs(fsPath).getDefaultBlockSize(fsPath)) else fs(fsPath).create(fsPath, overwrite)
}
if (compress && path.toLowerCase.endsWith(GzipExt)) new GZIPOutputStream(out)
else out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ case class FilePathMap(path: String, patterns: Seq[String] = Seq.empty) {
val pathMap: Map[String, String] = {
var map = collection.mutable.Map[String, String]()

val fs = FileSystem.get(SparkHadoopUtil.get.conf)
val files = fs.listFiles(new Path(path), true)
val p = new Path(path)
val fs = p.getFileSystem(SparkHadoopUtil.get.conf)
val files = fs.listFiles(p, true)
while (files.hasNext) {
val path = files.next.getPath
val filename = path.getName
if (patterns.isEmpty || patterns.exists(filename.matches)) {
if (map.contains(filename)) throw new RuntimeException("duplicate filename: " + filename)
if (map.contains(filename)) throw new RuntimeException("duplicate filename: " + filename + " in path: " + path)
map += filename -> path.getParent.toString.intern
}
}
Expand Down