Skip to content

Commit 15d8a85

Browse files
bdoyle0182Brendan Doyle
andauthored
handle pekko deprecations (#5555)
Co-authored-by: Brendan Doyle <[email protected]>
1 parent 7538ced commit 15d8a85

File tree

18 files changed

+53
-54
lines changed

18 files changed

+53
-54
lines changed

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/DockerToActivationFileLogStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ object OwSink {
174174
*/
175175
def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
176176
strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = {
177-
Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) =>
177+
Sink.fromGraph(GraphDSL.createGraph(first, second)((_, _)) { implicit b => (s1, s2) =>
178178
import GraphDSL.Implicits._
179179
val d = b.add(strategy(2))
180180

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/logging/SplunkLogStore.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import org.apache.pekko.http.scaladsl.model.Uri.Path
3333
import org.apache.pekko.http.scaladsl.model.headers.Authorization
3434
import org.apache.pekko.http.scaladsl.model.headers.BasicHttpCredentials
3535
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
36-
import org.apache.pekko.stream.OverflowStrategy
3736
import org.apache.pekko.stream.QueueOfferResult
3837
import org.apache.pekko.stream.scaladsl.Flow
3938
import org.apache.pekko.stream.scaladsl.Keep
@@ -178,9 +177,10 @@ class SplunkLogStore(
178177
.convertTo[String]}: ${l.fields(splunkConfig.logMessageField).convertTo[String].trim}"
179178

180179
//based on https://pekko.apache.org/docs/pekko-http/current/client-side/host-level.html
180+
// BoundedSourceQueue automatically drops new elements when full, maintaining dropNew behavior
181181
val queue =
182182
Source
183-
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew)
183+
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests)
184184
.via(httpFlow.getOrElse(defaultHttpFlow))
185185
.toMat(Sink.foreach({
186186
case ((Success(resp), p)) => p.success(resp)
@@ -190,7 +190,7 @@ class SplunkLogStore(
190190

191191
def queueRequest(request: HttpRequest): Future[HttpResponse] = {
192192
val responsePromise = Promise[HttpResponse]()
193-
queue.offer(request -> responsePromise).flatMap {
193+
queue.offer(request -> responsePromise) match {
194194
case QueueOfferResult.Enqueued => responsePromise.future
195195
case QueueOfferResult.Dropped =>
196196
Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later."))

common/scala/src/main/scala/org/apache/openwhisk/core/database/Batcher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class Batcher[T, R](batchSize: Int, concurrency: Int, retry: Int)(operation: (Se
6464
completionMatcher = cm,
6565
failureMatcher = PartialFunction.empty[Any, Throwable],
6666
bufferSize = Int.MaxValue,
67-
overflowStrategy = OverflowStrategy.dropNew)
67+
overflowStrategy = OverflowStrategy.dropBuffer)
6868
.batch(batchSize, Queue(_))((queue, element) => queue :+ element)
6969
.mapAsyncUnordered(concurrency) { els =>
7070
val elements = els.map(_._1)

common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[database] object StoreUtils {
8484

8585
def combinedSink[T](dest: Sink[ByteString, Future[T]])(
8686
implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = {
87-
Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) {
87+
Sink.fromGraph(GraphDSL.createGraph(digestSink(), lengthSink(), dest)(combineResult) {
8888
implicit builder => (dgs, ls, dests) =>
8989
import GraphDSL.Implicits._
9090

common/scala/src/main/scala/org/apache/openwhisk/core/database/azblob/AzureBlobAttachmentStore.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String,
299299
case None =>
300300
blobClient.exists().toFuture.toScala.map { exists =>
301301
if (exists) {
302-
val bbFlux = blobClient.download()
302+
val bbFlux = blobClient.downloadStream()
303303
Some(Source.fromPublisher(bbFlux).map(ByteString.fromByteBuffer))
304304
} else {
305305
throw NoDocumentException("Not found on 'readAttachment'.")

common/scala/src/main/scala/org/apache/openwhisk/core/database/s3/S3AttachmentStore.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,14 @@ class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String,
135135
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
136136
val source = getAttachmentSource(objectKey(docId, name))
137137

138-
val f = source.flatMap {
139-
case Some(x) => x.withAttributes(s3attributes).runWith(sink)
140-
case None => Future.failed(NoDocumentException("Not found on 'readAttachment'."))
141-
}
138+
val f = source
139+
.flatMap { x =>
140+
x.withAttributes(s3attributes).runWith(sink)
141+
}
142+
.recoverWith {
143+
case e: Throwable if isMissingKeyException(e) =>
144+
Future.failed(NoDocumentException("Not found on 'readAttachment'."))
145+
}
142146

143147
val g = f.transform(
144148
{ s =>
@@ -164,16 +168,14 @@ class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String,
164168
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
165169
}
166170

167-
private def getAttachmentSource(objectKey: String): Future[Option[Source[ByteString, Any]]] = urlSigner match {
168-
case Some(signer) => getUrlContent(signer.getSignedURL(objectKey))
171+
private def getAttachmentSource(objectKey: String): Future[Source[ByteString, Any]] = urlSigner match {
172+
case Some(signer) =>
173+
getUrlContent(signer.getSignedURL(objectKey)).map(_.get)
169174

170-
// When reading from S3 we get an optional source of ByteString and Metadata if the object exist
171-
// For such case drop the metadata
175+
// S3.getObject returns Source[ByteString, Future[ObjectMetadata]].
176+
// The materialized future will fail if the object doesn't exist, which is handled by the caller.
172177
case None =>
173-
S3.download(bucket, objectKey)
174-
.withAttributes(s3attributes)
175-
.runWith(Sink.head)
176-
.map(x => x.map(_._1))
178+
Future.successful(S3.getObject(bucket, objectKey).withAttributes(s3attributes))
177179
}
178180

179181
private def getUrlContent(uri: Uri): Future[Option[Source[ByteString, Any]]] = {
@@ -182,10 +184,9 @@ class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String,
182184
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
183185
Future.successful(Some(entity.dataBytes))
184186
case HttpResponse(status, _, entity, _) =>
185-
Unmarshal(entity).to[String].map { err =>
187+
Unmarshal(entity).to[String].flatMap { err =>
186188
//With CloudFront also the error message confirms to same S3 exception format
187-
val exp = S3Exception(err, status)
188-
if (isMissingKeyException(exp)) None else throw exp
189+
Future.failed(S3Exception(err, status))
189190
}
190191
}
191192
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Actions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
316316
}
317317
case Failure(t: RecordTooLargeException) =>
318318
logging.debug(this, s"[POST] action payload was too large")
319-
terminate(PayloadTooLarge)
319+
terminate(ContentTooLarge)
320320
case Failure(RejectRequest(code, message)) =>
321321
logging.debug(this, s"[POST] action rejected with code $code: $message")
322322
terminate(code, message)

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Entities.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.concurrent.Future
2121
import scala.language.postfixOps
2222
import scala.util.Try
2323
import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24-
import org.apache.pekko.http.scaladsl.model.StatusCodes.PayloadTooLarge
24+
import org.apache.pekko.http.scaladsl.model.StatusCodes.ContentTooLarge
2525
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
2626
import org.apache.pekko.http.scaladsl.server.Directive0
2727
import org.apache.pekko.http.scaladsl.server.Directives
@@ -45,7 +45,7 @@ protected[controller] trait ValidateRequestSize extends Directives {
4545
new Directive0 {
4646
override def tapply(f: Unit => Route) = {
4747
check map {
48-
case e: SizeError => terminate(PayloadTooLarge, Messages.entityTooBig(e))
48+
case e: SizeError => terminate(ContentTooLarge, Messages.entityTooBig(e))
4949
} getOrElse f(None)
5050
}
5151
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Packages.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ trait WhiskPackagesApi extends WhiskCollectionAPI with ReferencedEntities {
336336
logging.debug(this, s"fetching package '$docid' for reference")
337337
if (docid == wp.docid) {
338338
logging.error(this, s"unexpected package binding refers to itself: $docid")
339-
terminate(UnprocessableEntity, Messages.packageBindingCircularReference(b.fullyQualifiedName.toString))
339+
terminate(UnprocessableContent, Messages.packageBindingCircularReference(b.fullyQualifiedName.toString))
340340
} else {
341341

342342
/** Here's where I check package execute only case with package binding. */

core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/PackageCollection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class PackageCollection(entityStore: EntityStore)(implicit logging: Logging) ext
102102
logging.error(this, s"unexpected package binding refers to itself: $doc")
103103
Future.failed(
104104
RejectRequest(
105-
UnprocessableEntity,
105+
UnprocessableContent,
106106
Messages.packageBindingCircularReference(binding.fullyQualifiedName.toString)))
107107
} else {
108108
checkPackageReadPermission(namespaces, pkgOwner, pkgDocid)

0 commit comments

Comments
 (0)