Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Definitions based on free algebras - Unary Services #16

Merged
merged 3 commits into from
Jun 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ lazy val rpc = project
Seq(
libraryDependencies ++= commonDeps ++ freestyleCoreDeps() ++
Seq(
%%("freestyle-async"),
"io.grpc" % "grpc-all" % "1.4.0"
)
): _*
Expand All @@ -31,6 +32,9 @@ lazy val `demo-greeting` = project
.settings(noPublishSettings: _*)
.settings(commandAliases: _*)
.settings(demoCommonSettings: _*)
.settings(Seq(
libraryDependencies += %%("freestyle-async")
): _*)

lazy val googleApi = project
.in(file("third_party"))
Expand Down
15 changes: 1 addition & 14 deletions demo/greeting/src/main/scala/greeting/GreetingClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ package greeting

import java.util.concurrent.{CountDownLatch, TimeUnit}

import io.grpc.ManagedChannelBuilder
import freestyle.rpc.demo.greeting.GreeterGrpc._
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.StreamObserver

import scala.collection.immutable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.Random
Expand All @@ -36,18 +35,6 @@ class GreetingClient(host: String, port: Int) {

private[this] val asyncHelloClient: GreeterStub = GreeterGrpc.stub(channel)

def unaryDemo(request: MessageRequest): Unit = {

val response = for {
hi <- asyncHelloClient.sayHello(request)
bye <- asyncHelloClient.sayGoodbye(request)
} yield (hi.message, bye.message)

println("")
println(s"Received -> ${Await.result(response, Duration.Inf)}")
println("")
}

def serverStreamingDemo(request: MessageRequest): Future[Unit] = {
val lotOfRepliesStreamingPromise = Promise[Unit]()
val lotOfRepliesObserver = new StreamObserver[MessageReply] {
Expand Down
73 changes: 43 additions & 30 deletions demo/greeting/src/main/scala/greeting/GreetingClientApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,61 @@
package freestyle.rpc.demo
package greeting

import freestyle.rpc.demo.echo.EchoServiceGrpc
import freestyle.rpc.demo.echo.EchoServiceGrpc.EchoServiceStub
import freestyle.rpc.demo.echo_messages.EchoRequest
import io.grpc.ManagedChannelBuilder

import scala.concurrent.Await
import cats.implicits._
import freestyle._
import freestyle.implicits._
import freestyle.rpc.demo.echo_messages._
import runtime.client.implicits._
import greeting.client._
import io.grpc._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

@module
trait ClientAPP {
val greetingClientM: GreetingClientM
val echoClientM: EchoClientM
}

object GreetingClientApp {

def main(args: Array[String]): Unit = {
type UnaryDemoResponse = (MessageReply, MessageReply, EchoResponse)

val messageRequest = MessageRequest("Freestyle")
val echoRequest = EchoRequest("echo...")

def unaryDemo[F[_]](implicit APP: ClientAPP[F]): FreeS[F, UnaryDemoResponse] = {

val greetingClientM: GreetingClientM[F] = APP.greetingClientM
val echoClientM: EchoClientM[F] = APP.echoClientM

val defaultOptions = CallOptions.DEFAULT

val tupled = (
greetingClientM.sayHello(messageRequest, defaultOptions) |@|
greetingClientM.sayGoodbye(messageRequest, defaultOptions) |@|
echoClientM.echo(echoRequest, defaultOptions)
).tupled

val request = MessageRequest("Freestyle")
val client = new GreetingClient(host, portNode1)
// val client = new GreetingClient(host, portNode2)
tupled.map {
case (hi: MessageReply, bye: MessageReply, echo: EchoResponse) =>
println(s"Received -> (${hi.message}, ${bye.message}, ${echo.message})")
(hi, bye, echo)
}
}

def main(args: Array[String]): Unit = {

// http://www.grpc.io/docs/guides/concepts.html
Await.result(unaryDemo[ClientAPP.Op].interpret[Future], Duration.Inf)

// Unary RPCs where the client sends a single request to the server and
// gets a single response back, just like a normal function call:
client.unaryDemo(request)
val client = new GreetingClient(host, portNode1)

// Server streaming RPCs where the client sends a request to the server and
// gets a stream to read a sequence of messages back. The client reads from
// the returned stream until there are no more messages.

client.serverStreamingDemo(request)
client.serverStreamingDemo(messageRequest)

// Client streaming RPCs where the client writes a sequence of messages and sends them
// to the server, again using a provided stream. Once the client has finished writing the messages,
Expand All @@ -59,21 +87,6 @@ object GreetingClientApp {

client.biStreamingDemo()

// EchoDemo using the same server where the greeting service is deployed.
echoDemo(EchoRequest("echo..."))

(): Unit
}

def echoDemo(request: EchoRequest): Unit = {

val channel =
ManagedChannelBuilder.forAddress(host, portNode1).usePlaintext(true).build

val asyncEchoClient: EchoServiceStub = EchoServiceGrpc.stub(channel)

println("")
println(s"Received -> ${Await.result(asyncEchoClient.echo(request), Duration.Inf)}")
println("")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package greeting
import cats.implicits._
import freestyle.rpc.server._
import freestyle.rpc.server.implicits._
import runtime.implicits._
import runtime.server.implicits._

import scala.concurrent.duration.Duration
import scala.concurrent.Await
Expand Down
40 changes: 40 additions & 0 deletions demo/greeting/src/main/scala/greeting/client/EchoClientM.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.rpc.demo
package greeting.client

import freestyle._
import freestyle.rpc.client._
import freestyle.rpc.demo.echo.EchoServiceGrpc
import freestyle.rpc.demo.echo_messages._
import io.grpc.CallOptions

@module
trait EchoClientM {

val clientCallsM: ClientCallsM
val channelOps: ChannelM

def echo(
request: EchoRequest,
options: CallOptions = CallOptions.DEFAULT): FS.Seq[EchoResponse] =
for {
call <- channelOps.newCall(EchoServiceGrpc.METHOD_ECHO, options)
response <- clientCallsM.asyncM(call, request)
} yield response

}
73 changes: 73 additions & 0 deletions demo/greeting/src/main/scala/greeting/client/GreetingClientM.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.rpc.demo
package greeting.client

import freestyle._
import freestyle.rpc.client._
import freestyle.rpc.demo.greeting._
import io.grpc.CallOptions
import io.grpc.stub.StreamObserver

@module
trait GreetingClientM {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏


val clientCallsM: ClientCallsM
val channelOps: ChannelM

def sayHello(
request: MessageRequest,
options: CallOptions = CallOptions.DEFAULT): FS.Seq[MessageReply] =
for {
call <- channelOps.newCall(GreeterGrpc.METHOD_SAY_HELLO, options)
response <- clientCallsM.asyncM(call, request)
} yield response

def sayGoodbye(
request: MessageRequest,
options: CallOptions = CallOptions.DEFAULT): FS.Seq[MessageReply] =
for {
call <- channelOps.newCall(GreeterGrpc.METHOD_SAY_GOODBYE, options)
response <- clientCallsM.asyncM(call, request)
} yield response

def lotsOfReplies(
request: MessageRequest,
responseObserver: StreamObserver[MessageReply],
options: CallOptions = CallOptions.DEFAULT): FS.Seq[Unit] =
for {
call <- channelOps.newCall(GreeterGrpc.METHOD_LOTS_OF_REPLIES, options)
response <- clientCallsM.asyncStreamServer(call, request, responseObserver)
} yield response

def lotsOfGreetings(
responseObserver: StreamObserver[MessageReply],
options: CallOptions = CallOptions.DEFAULT): FS.Seq[StreamObserver[MessageRequest]] =
for {
call <- channelOps.newCall(GreeterGrpc.METHOD_LOTS_OF_GREETINGS, options)
response <- clientCallsM.asyncStreamClient(call, responseObserver)
} yield response

def bidiHello(
responseObserver: StreamObserver[MessageReply],
options: CallOptions = CallOptions.DEFAULT): FS.Seq[StreamObserver[MessageRequest]] =
for {
call <- channelOps.newCall(GreeterGrpc.METHOD_BIDI_HELLO, options)
response <- clientCallsM.asyncStreamBidi(call, responseObserver)
} yield response

}
61 changes: 50 additions & 11 deletions demo/greeting/src/main/scala/greeting/runtime/implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,67 @@
package freestyle.rpc.demo
package greeting.runtime

import cats.~>
import freestyle._
import freestyle.implicits._
import freestyle.async.implicits._
import freestyle.rpc.demo.echo.EchoServiceGrpc
import freestyle.rpc.demo.greeting._
import freestyle.rpc.demo.greeting.runtime.server.Implicits
import freestyle.rpc.demo.greeting.service._
import freestyle.rpc.server._
import freestyle.rpc.server.implicits._
import freestyle.rpc.server.handlers._

import scala.concurrent.{ExecutionContext, Future}

object implicits {
trait CommonImplicits {

implicit val config: Config = Config(portNode1)
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global

implicit val grpcConfigs: List[GrpcConfig] = List(
AddService(GreeterGrpc.bindService(new GreetingService, ec)),
AddService(EchoServiceGrpc.bindService(new EchoService, ec))
)
}

object server {

trait Implicits extends CommonImplicits {

import freestyle.rpc.server._
import freestyle.rpc.server.handlers._
import freestyle.rpc.server.implicits._

implicit val config: Config = Config(portNode1)

implicit val grpcConfigs: List[GrpcConfig] = List(
AddService(GreeterGrpc.bindService(new GreetingService, ec)),
AddService(EchoServiceGrpc.bindService(new EchoService, ec))
)

implicit val grpcServerHandler =
new GrpcServerHandler[Future] andThen new GrpcConfigInterpreter[Future]

}

object implicits extends Implicits

}

object client {

trait Implicits extends CommonImplicits {

import freestyle.rpc.client._
import freestyle.rpc.client.implicits._
import freestyle.rpc.client.handlers.{ChannelMHandler, ClientCallsMHandler}

val channelFor: ManagedChannelFor = ManagedChannelForAddress(host, portNode1)

val channelConfigList: List[ManagedChannelConfig] = List(UsePlaintext(true))

implicit def channelMHandler[F[_]]: ChannelM.Op ~> Future =
new ChannelMHandler[Future] andThen
new ManagedChannelInterpreter[Future](channelFor, channelConfigList)

implicit val clientCallsMHandler: ClientCallsM.Op ~> Future = new ClientCallsMHandler[Future]

}

implicit val grpcServerHandler =
new GrpcServerHandler[Future] andThen new GrpcConfigInterpreter[Future]
object implicits extends Implicits

}
41 changes: 41 additions & 0 deletions rpc/src/main/scala/client/ChannelConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package freestyle.rpc
package client

import java.util.concurrent.{Executor, TimeUnit}

import io.grpc._

sealed trait ManagedChannelFor extends Product with Serializable
case class ManagedChannelForAddress(name: String, port: Int) extends ManagedChannelFor
case class ManagedChannelForTarget(target: String) extends ManagedChannelFor

sealed trait ManagedChannelConfig extends Product with Serializable
case object DirectExecutor extends ManagedChannelConfig
case class SetExecutor(executor: Executor) extends ManagedChannelConfig
case class AddInterceptorList(interceptors: List[ClientInterceptor]) extends ManagedChannelConfig
case class AddInterceptor(interceptors: ClientInterceptor*) extends ManagedChannelConfig
case class UserAgent(userAgent: String) extends ManagedChannelConfig
case class OverrideAuthority(authority: String) extends ManagedChannelConfig
case class UsePlaintext(skipNegotiation: Boolean) extends ManagedChannelConfig
case class NameResolverFactory(resolverFactory: NameResolver.Factory) extends ManagedChannelConfig
case class LoadBalancerFactory(lbFactory: LoadBalancer.Factory) extends ManagedChannelConfig
case class SetDecompressorRegistry(registry: DecompressorRegistry) extends ManagedChannelConfig
case class SetCompressorRegistry(registry: CompressorRegistry) extends ManagedChannelConfig
case class SetIdleTimeout(value: Long, unit: TimeUnit) extends ManagedChannelConfig
case class SetMaxInboundMessageSize(max: Int) extends ManagedChannelConfig
Loading