Skip to content

Commit cc2690d

Browse files
authored
Merge pull request #95 from jpolchlo/update/osmesa-improvements
Bring in recent changes from OSMesa
2 parents 1043d64 + 0e394bb commit cc2690d

File tree

3 files changed

+13
-12
lines changed

3 files changed

+13
-12
lines changed

.travis.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
language: scala
22

3+
jdk:
4+
- openjdk8
5+
36
scala:
4-
- 2.11.12
7+
- "2.11.12"
58

69
sbt_args: -J-Xmx6g
710

src/main/scala/vectorpipe/functions/osm/package.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.apache.spark.sql.functions._
55
import org.apache.spark.sql.types._
66
import org.apache.spark.sql.{Column, DataFrame, Row}
77
import vectorpipe.model.Member
8+
import vectorpipe.util._
89

910
import scala.util.matching.Regex
1011
import scala.util.{Failure, Success, Try}
@@ -292,8 +293,12 @@ package object osm {
292293
def isWaterway(tags: Column): Column =
293294
array_intersects(splitDelimitedValues(tags.getItem("waterway")), lit(WaterwayValues.toArray)) as 'isWaterway
294295

295-
def mergeTags: UserDefinedFunction = udf {
296-
(_: Map[String, String]) ++ (_: Map[String, String])
296+
def mergeTags: UserDefinedFunction = udf { (a: Map[String, String], b: Map[String, String]) =>
297+
mergeMaps(a.mapValues(Set(_)), b.mapValues(Set(_)))(_ ++ _).mapValues(_.mkString(";"))
298+
}
299+
300+
val reduceTags: UserDefinedFunction = udf { tags: Iterable[Map[String, String]] =>
301+
tags.map(x => x.mapValues(Set(_))).reduce((a, b) => mergeMaps(a, b)(_ ++ _)).mapValues(_.mkString(";"))
297302
}
298303

299304
val array_intersects: UserDefinedFunction = udf { (a: Seq[_], b: Seq[_]) =>

src/main/scala/vectorpipe/internal/package.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ package object internal {
132132
} else {
133133
@transient val idByVersion = Window.partitionBy('id).orderBy('version)
134134

135-
// when a node has been deleted, it doesn't include any tags; use a window function to retrieve the last tags
136-
// present and use those
135+
// when a node has been deleted, it doesn't include any tags or nds; use a window function to retrieve the last
136+
// tags and nds present and use those
137137
history
138138
.where('type === "way")
139139
.repartition('id)
@@ -174,13 +174,6 @@ package object internal {
174174
} else {
175175
@transient val idByUpdated = Window.partitionBy('id).orderBy('version)
176176

177-
val frame =
178-
if (hasCompressedMemberTypes(history)) {
179-
history
180-
} else {
181-
history.withColumn("members", compressMemberTypes('members))
182-
}
183-
184177
// when an element has been deleted, it doesn't include any tags; use a window function to retrieve the last tags
185178
// present and use those
186179
history

0 commit comments

Comments
 (0)