Skip to content

Commit 6f44e54

Browse files
Fixes client streaming rpc server (#46)
1 parent f267713 commit 6f44e54

File tree

3 files changed

+5
-4
lines changed

3 files changed

+5
-4
lines changed

rpc/src/main/scala/internal/service/calls.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import io.grpc.stub.ServerCalls.{
2929
}
3030
import io.grpc.stub.StreamObserver
3131
import io.grpc.{Status, StatusException}
32+
import monix.eval.Task
3233
import monix.execution.{Ack, Scheduler}
3334
import monix.reactive.Observable
3435
import monix.reactive.observables.ObservableLike.Transformer
@@ -66,13 +67,13 @@ object calls {
6667

6768
def clientStreamingMethod[F[_], M[_], Req, Res](f: (Observable[Req]) => FreeS[F, Res])(
6869
implicit ME: MonadError[M, Throwable],
69-
C: Comonad[M],
7070
H: FSHandler[F, M],
71+
HTask: FSHandler[M, Task],
7172
S: Scheduler): ClientStreamingMethod[Req, Res] = new ClientStreamingMethod[Req, Res] {
7273

7374
override def invoke(responseObserver: StreamObserver[Res]): StreamObserver[Req] = {
7475
transform[Req, Res](
75-
inputObservable => Observable.eval(C.extract(f(inputObservable).interpret[M])),
76+
inputObservable => Observable.fromTask(HTask(f(inputObservable).interpret[M])),
7677
responseObserver
7778
)
7879
}

rpc/src/main/scala/internal/service/service.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ case class ServiceAlg(defn: Defn) {
101101
val serviceBindings: Defn.Def = {
102102
val args: Seq[Term.Tuple] = requests.map(_.call)
103103
q"""
104-
def bindService[F[_], M[_]](implicit algebra: $algName[F], handler: _root_.freestyle.FSHandler[F, M], ME: _root_.cats.MonadError[M, Throwable], C: _root_.cats.Comonad[M], S: _root_.monix.execution.Scheduler): _root_.io.grpc.ServerServiceDefinition =
104+
def bindService[F[_], M[_]](implicit algebra: $algName[F], HTask: _root_.freestyle.FSHandler[M, _root_.monix.eval.Task], handler: _root_.freestyle.FSHandler[F, M], ME: _root_.cats.MonadError[M, Throwable], C: _root_.cats.Comonad[M], S: _root_.monix.execution.Scheduler): _root_.io.grpc.ServerServiceDefinition =
105105
new freestyle.rpc.internal.service.GRPCServiceDefBuilder(${Lit.String(algName.value)}, ..$args).apply
106106
"""
107107
}

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version in ThisBuild := "0.0.3-SNAPSHOT"
1+
version in ThisBuild := "0.0.3"

0 commit comments

Comments
 (0)