Skip to content

Commit a142c5f

Browse files
Client Definitions based on free algebras - Unary Services (#16)
* Client definitions based on free algebras * Addresses code review comments * Removes the superfluous Monad constraint
1 parent f9b3fad commit a142c5f

17 files changed

+578
-81
lines changed

build.sbt

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ lazy val rpc = project
1818
Seq(
1919
libraryDependencies ++= commonDeps ++ freestyleCoreDeps() ++
2020
Seq(
21+
%%("freestyle-async"),
2122
"io.grpc" % "grpc-all" % "1.4.0"
2223
)
2324
): _*
@@ -31,6 +32,9 @@ lazy val `demo-greeting` = project
3132
.settings(noPublishSettings: _*)
3233
.settings(commandAliases: _*)
3334
.settings(demoCommonSettings: _*)
35+
.settings(Seq(
36+
libraryDependencies += %%("freestyle-async")
37+
): _*)
3438

3539
lazy val googleApi = project
3640
.in(file("third_party"))

demo/greeting/src/main/scala/greeting/GreetingClient.scala

+1-14
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ package greeting
1919

2020
import java.util.concurrent.{CountDownLatch, TimeUnit}
2121

22-
import io.grpc.ManagedChannelBuilder
2322
import freestyle.rpc.demo.greeting.GreeterGrpc._
23+
import io.grpc.ManagedChannelBuilder
2424
import io.grpc.stub.StreamObserver
2525

2626
import scala.collection.immutable
27-
import scala.concurrent.ExecutionContext.Implicits.global
2827
import scala.concurrent.{Await, Future, Promise}
2928
import scala.concurrent.duration._
3029
import scala.util.Random
@@ -36,18 +35,6 @@ class GreetingClient(host: String, port: Int) {
3635

3736
private[this] val asyncHelloClient: GreeterStub = GreeterGrpc.stub(channel)
3837

39-
def unaryDemo(request: MessageRequest): Unit = {
40-
41-
val response = for {
42-
hi <- asyncHelloClient.sayHello(request)
43-
bye <- asyncHelloClient.sayGoodbye(request)
44-
} yield (hi.message, bye.message)
45-
46-
println("")
47-
println(s"Received -> ${Await.result(response, Duration.Inf)}")
48-
println("")
49-
}
50-
5138
def serverStreamingDemo(request: MessageRequest): Future[Unit] = {
5239
val lotOfRepliesStreamingPromise = Promise[Unit]()
5340
val lotOfRepliesObserver = new StreamObserver[MessageReply] {

demo/greeting/src/main/scala/greeting/GreetingClientApp.scala

+43-30
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,61 @@
1717
package freestyle.rpc.demo
1818
package greeting
1919

20-
import freestyle.rpc.demo.echo.EchoServiceGrpc
21-
import freestyle.rpc.demo.echo.EchoServiceGrpc.EchoServiceStub
22-
import freestyle.rpc.demo.echo_messages.EchoRequest
23-
import io.grpc.ManagedChannelBuilder
24-
25-
import scala.concurrent.Await
20+
import cats.implicits._
21+
import freestyle._
22+
import freestyle.implicits._
23+
import freestyle.rpc.demo.echo_messages._
24+
import runtime.client.implicits._
25+
import greeting.client._
26+
import io.grpc._
27+
28+
import scala.concurrent.{Await, Future}
2629
import scala.concurrent.duration.Duration
2730

31+
@module
32+
trait ClientAPP {
33+
val greetingClientM: GreetingClientM
34+
val echoClientM: EchoClientM
35+
}
36+
2837
object GreetingClientApp {
2938

30-
def main(args: Array[String]): Unit = {
39+
type UnaryDemoResponse = (MessageReply, MessageReply, EchoResponse)
40+
41+
val messageRequest = MessageRequest("Freestyle")
42+
val echoRequest = EchoRequest("echo...")
43+
44+
def unaryDemo[F[_]](implicit APP: ClientAPP[F]): FreeS[F, UnaryDemoResponse] = {
45+
46+
val greetingClientM: GreetingClientM[F] = APP.greetingClientM
47+
val echoClientM: EchoClientM[F] = APP.echoClientM
48+
49+
val defaultOptions = CallOptions.DEFAULT
50+
51+
val tupled = (
52+
greetingClientM.sayHello(messageRequest, defaultOptions) |@|
53+
greetingClientM.sayGoodbye(messageRequest, defaultOptions) |@|
54+
echoClientM.echo(echoRequest, defaultOptions)
55+
).tupled
3156

32-
val request = MessageRequest("Freestyle")
33-
val client = new GreetingClient(host, portNode1)
34-
// val client = new GreetingClient(host, portNode2)
57+
tupled.map {
58+
case (hi: MessageReply, bye: MessageReply, echo: EchoResponse) =>
59+
println(s"Received -> (${hi.message}, ${bye.message}, ${echo.message})")
60+
(hi, bye, echo)
61+
}
62+
}
63+
64+
def main(args: Array[String]): Unit = {
3565

36-
// http://www.grpc.io/docs/guides/concepts.html
66+
Await.result(unaryDemo[ClientAPP.Op].interpret[Future], Duration.Inf)
3767

38-
// Unary RPCs where the client sends a single request to the server and
39-
// gets a single response back, just like a normal function call:
40-
client.unaryDemo(request)
68+
val client = new GreetingClient(host, portNode1)
4169

4270
// Server streaming RPCs where the client sends a request to the server and
4371
// gets a stream to read a sequence of messages back. The client reads from
4472
// the returned stream until there are no more messages.
4573

46-
client.serverStreamingDemo(request)
74+
client.serverStreamingDemo(messageRequest)
4775

4876
// Client streaming RPCs where the client writes a sequence of messages and sends them
4977
// to the server, again using a provided stream. Once the client has finished writing the messages,
@@ -59,21 +87,6 @@ object GreetingClientApp {
5987

6088
client.biStreamingDemo()
6189

62-
// EchoDemo using the same server where the greeting service is deployed.
63-
echoDemo(EchoRequest("echo..."))
64-
6590
(): Unit
6691
}
67-
68-
def echoDemo(request: EchoRequest): Unit = {
69-
70-
val channel =
71-
ManagedChannelBuilder.forAddress(host, portNode1).usePlaintext(true).build
72-
73-
val asyncEchoClient: EchoServiceStub = EchoServiceGrpc.stub(channel)
74-
75-
println("")
76-
println(s"Received -> ${Await.result(asyncEchoClient.echo(request), Duration.Inf)}")
77-
println("")
78-
}
7992
}

demo/greeting/src/main/scala/greeting/GreetingServerApp.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package greeting
2020
import cats.implicits._
2121
import freestyle.rpc.server._
2222
import freestyle.rpc.server.implicits._
23-
import runtime.implicits._
23+
import runtime.server.implicits._
2424

2525
import scala.concurrent.duration.Duration
2626
import scala.concurrent.Await
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package freestyle.rpc.demo
18+
package greeting.client
19+
20+
import freestyle._
21+
import freestyle.rpc.client._
22+
import freestyle.rpc.demo.echo.EchoServiceGrpc
23+
import freestyle.rpc.demo.echo_messages._
24+
import io.grpc.CallOptions
25+
26+
@module
27+
trait EchoClientM {
28+
29+
val clientCallsM: ClientCallsM
30+
val channelOps: ChannelM
31+
32+
def echo(
33+
request: EchoRequest,
34+
options: CallOptions = CallOptions.DEFAULT): FS.Seq[EchoResponse] =
35+
for {
36+
call <- channelOps.newCall(EchoServiceGrpc.METHOD_ECHO, options)
37+
response <- clientCallsM.asyncM(call, request)
38+
} yield response
39+
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package freestyle.rpc.demo
18+
package greeting.client
19+
20+
import freestyle._
21+
import freestyle.rpc.client._
22+
import freestyle.rpc.demo.greeting._
23+
import io.grpc.CallOptions
24+
import io.grpc.stub.StreamObserver
25+
26+
@module
27+
trait GreetingClientM {
28+
29+
val clientCallsM: ClientCallsM
30+
val channelOps: ChannelM
31+
32+
def sayHello(
33+
request: MessageRequest,
34+
options: CallOptions = CallOptions.DEFAULT): FS.Seq[MessageReply] =
35+
for {
36+
call <- channelOps.newCall(GreeterGrpc.METHOD_SAY_HELLO, options)
37+
response <- clientCallsM.asyncM(call, request)
38+
} yield response
39+
40+
def sayGoodbye(
41+
request: MessageRequest,
42+
options: CallOptions = CallOptions.DEFAULT): FS.Seq[MessageReply] =
43+
for {
44+
call <- channelOps.newCall(GreeterGrpc.METHOD_SAY_GOODBYE, options)
45+
response <- clientCallsM.asyncM(call, request)
46+
} yield response
47+
48+
def lotsOfReplies(
49+
request: MessageRequest,
50+
responseObserver: StreamObserver[MessageReply],
51+
options: CallOptions = CallOptions.DEFAULT): FS.Seq[Unit] =
52+
for {
53+
call <- channelOps.newCall(GreeterGrpc.METHOD_LOTS_OF_REPLIES, options)
54+
response <- clientCallsM.asyncStreamServer(call, request, responseObserver)
55+
} yield response
56+
57+
def lotsOfGreetings(
58+
responseObserver: StreamObserver[MessageReply],
59+
options: CallOptions = CallOptions.DEFAULT): FS.Seq[StreamObserver[MessageRequest]] =
60+
for {
61+
call <- channelOps.newCall(GreeterGrpc.METHOD_LOTS_OF_GREETINGS, options)
62+
response <- clientCallsM.asyncStreamClient(call, responseObserver)
63+
} yield response
64+
65+
def bidiHello(
66+
responseObserver: StreamObserver[MessageReply],
67+
options: CallOptions = CallOptions.DEFAULT): FS.Seq[StreamObserver[MessageRequest]] =
68+
for {
69+
call <- channelOps.newCall(GreeterGrpc.METHOD_BIDI_HELLO, options)
70+
response <- clientCallsM.asyncStreamBidi(call, responseObserver)
71+
} yield response
72+
73+
}

demo/greeting/src/main/scala/greeting/runtime/implicits.scala

+50-11
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,67 @@
1717
package freestyle.rpc.demo
1818
package greeting.runtime
1919

20+
import cats.~>
2021
import freestyle._
2122
import freestyle.implicits._
23+
import freestyle.async.implicits._
2224
import freestyle.rpc.demo.echo.EchoServiceGrpc
2325
import freestyle.rpc.demo.greeting._
26+
import freestyle.rpc.demo.greeting.runtime.server.Implicits
2427
import freestyle.rpc.demo.greeting.service._
25-
import freestyle.rpc.server._
26-
import freestyle.rpc.server.implicits._
27-
import freestyle.rpc.server.handlers._
2828

2929
import scala.concurrent.{ExecutionContext, Future}
3030

31-
object implicits {
31+
trait CommonImplicits {
3232

33-
implicit val config: Config = Config(portNode1)
3433
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
3534

36-
implicit val grpcConfigs: List[GrpcConfig] = List(
37-
AddService(GreeterGrpc.bindService(new GreetingService, ec)),
38-
AddService(EchoServiceGrpc.bindService(new EchoService, ec))
39-
)
35+
}
36+
37+
object server {
38+
39+
trait Implicits extends CommonImplicits {
40+
41+
import freestyle.rpc.server._
42+
import freestyle.rpc.server.handlers._
43+
import freestyle.rpc.server.implicits._
44+
45+
implicit val config: Config = Config(portNode1)
46+
47+
implicit val grpcConfigs: List[GrpcConfig] = List(
48+
AddService(GreeterGrpc.bindService(new GreetingService, ec)),
49+
AddService(EchoServiceGrpc.bindService(new EchoService, ec))
50+
)
51+
52+
implicit val grpcServerHandler =
53+
new GrpcServerHandler[Future] andThen new GrpcConfigInterpreter[Future]
54+
55+
}
56+
57+
object implicits extends Implicits
58+
59+
}
60+
61+
object client {
62+
63+
trait Implicits extends CommonImplicits {
64+
65+
import freestyle.rpc.client._
66+
import freestyle.rpc.client.implicits._
67+
import freestyle.rpc.client.handlers.{ChannelMHandler, ClientCallsMHandler}
68+
69+
val channelFor: ManagedChannelFor = ManagedChannelForAddress(host, portNode1)
70+
71+
val channelConfigList: List[ManagedChannelConfig] = List(UsePlaintext(true))
72+
73+
implicit def channelMHandler[F[_]]: ChannelM.Op ~> Future =
74+
new ChannelMHandler[Future] andThen
75+
new ManagedChannelInterpreter[Future](channelFor, channelConfigList)
76+
77+
implicit val clientCallsMHandler: ClientCallsM.Op ~> Future = new ClientCallsMHandler[Future]
78+
79+
}
4080

41-
implicit val grpcServerHandler =
42-
new GrpcServerHandler[Future] andThen new GrpcConfigInterpreter[Future]
81+
object implicits extends Implicits
4382

4483
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2017 47 Degrees, LLC. <http://www.47deg.com>
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package freestyle.rpc
18+
package client
19+
20+
import java.util.concurrent.{Executor, TimeUnit}
21+
22+
import io.grpc._
23+
24+
sealed trait ManagedChannelFor extends Product with Serializable
25+
case class ManagedChannelForAddress(name: String, port: Int) extends ManagedChannelFor
26+
case class ManagedChannelForTarget(target: String) extends ManagedChannelFor
27+
28+
sealed trait ManagedChannelConfig extends Product with Serializable
29+
case object DirectExecutor extends ManagedChannelConfig
30+
case class SetExecutor(executor: Executor) extends ManagedChannelConfig
31+
case class AddInterceptorList(interceptors: List[ClientInterceptor]) extends ManagedChannelConfig
32+
case class AddInterceptor(interceptors: ClientInterceptor*) extends ManagedChannelConfig
33+
case class UserAgent(userAgent: String) extends ManagedChannelConfig
34+
case class OverrideAuthority(authority: String) extends ManagedChannelConfig
35+
case class UsePlaintext(skipNegotiation: Boolean) extends ManagedChannelConfig
36+
case class NameResolverFactory(resolverFactory: NameResolver.Factory) extends ManagedChannelConfig
37+
case class LoadBalancerFactory(lbFactory: LoadBalancer.Factory) extends ManagedChannelConfig
38+
case class SetDecompressorRegistry(registry: DecompressorRegistry) extends ManagedChannelConfig
39+
case class SetCompressorRegistry(registry: CompressorRegistry) extends ManagedChannelConfig
40+
case class SetIdleTimeout(value: Long, unit: TimeUnit) extends ManagedChannelConfig
41+
case class SetMaxInboundMessageSize(max: Int) extends ManagedChannelConfig

0 commit comments

Comments
 (0)