Skip to content

Commit 13be9df

Browse files
committed
:spark: #25 use async grpc client to call
1 parent 428af04 commit 13be9df

File tree

7 files changed

+81
-42
lines changed

7 files changed

+81
-42
lines changed

wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/ContactRawSupport.scala

+21-18
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import wechaty.puppet.schemas.Contact.ContactPayload
99
import wechaty.puppet.schemas.Puppet
1010
import wechaty.puppet.support.ContactSupport
1111

12+
import scala.concurrent.Future
13+
1214
/**
1315
*
1416
* @author <a href="mailto:[email protected]">Jun Tsai</a>
@@ -80,23 +82,24 @@ trait ContactRawSupport {
8082
file
8183
}
8284

83-
override protected def contactRawPayload(contactID: String): ContactPayload = {
84-
val response = grpcClient.contactPayload(ContactPayloadRequest.newBuilder().setId(contactID).build())
85-
val contact = new ContactPayload
86-
contact.id = response.getId
87-
contact.gender = wechaty.puppet.schemas.Contact.ContactGender.apply(response.getGenderValue)
88-
contact.`type` = wechaty.puppet.schemas.Contact.ContactType.apply(response.getTypeValue)
89-
contact.name = response.getName
90-
contact.avatar = response.getAvatar
91-
contact.address = response.getAddress
92-
contact.alias = response.getAlias
93-
contact.city = response.getCity
94-
contact.friend = response.getFriend
95-
contact.province = response.getProvince
96-
contact.signature = response.getSignature
97-
contact.star = response.getStar
98-
contact.weixin = response.getWeixin
99-
100-
contact
85+
override protected def contactRawPayload(contactID: String): Future[ContactPayload] = {
86+
val request = ContactPayloadRequest.newBuilder().setId(contactID).build()
87+
asyncCallback(asyncGrpcClient.contactPayload,request){response =>
88+
val contact = new ContactPayload
89+
contact.id = response.getId
90+
contact.gender = wechaty.puppet.schemas.Contact.ContactGender.apply(response.getGenderValue)
91+
contact.`type` = wechaty.puppet.schemas.Contact.ContactType.apply(response.getTypeValue)
92+
contact.name = response.getName
93+
contact.avatar = response.getAvatar
94+
contact.address = response.getAddress
95+
contact.alias = response.getAlias
96+
contact.city = response.getCity
97+
contact.friend = response.getFriend
98+
contact.province = response.getProvince
99+
contact.signature = response.getSignature
100+
contact.star = response.getStar
101+
contact.weixin = response.getWeixin
102+
contact
103+
}
101104
}
102105
}

wechaty-puppet-hostie/src/main/scala/wechaty/hostie/support/GrpcSupport.scala

+33-8
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@ import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
55

66
import com.typesafe.scalalogging.LazyLogging
77
import io.github.wechaty.grpc.PuppetGrpc
8-
import io.github.wechaty.grpc.puppet.Base
8+
import io.github.wechaty.grpc.puppet.Contact.ContactPayloadRequest
9+
import io.github.wechaty.grpc.puppet.{Base, Contact}
910
import io.github.wechaty.grpc.puppet.Event.EventRequest
11+
import io.grpc.stub.ClientCalls.asyncUnaryCall
1012
import io.grpc.stub.StreamObserver
11-
import io.grpc.{ManagedChannel, ManagedChannelBuilder}
13+
import io.grpc.{ClientCall, ManagedChannel, ManagedChannelBuilder}
1214
import wechaty.hostie.PuppetHostie
15+
import wechaty.puppet.schemas.Contact.ContactPayload
16+
17+
import scala.concurrent.{Future, Promise}
1318

1419
/**
1520
*
@@ -23,9 +28,9 @@ trait GrpcSupport {
2328
private val HEARTBEAT_COUNTER = new AtomicLong()
2429
private val HOSTIE_KEEPALIVE_TIMEOUT = 15 * 1000L
2530
private val DEFAULT_WATCHDOG_TIMEOUT = 60L
26-
protected var grpcClient: PuppetGrpc.PuppetBlockingStub = _
27-
private var eventStream: PuppetGrpc.PuppetStub = _
28-
protected var channel: ManagedChannel = _
31+
protected var grpcClient : PuppetGrpc.PuppetBlockingStub = _
32+
protected var asyncGrpcClient: PuppetGrpc.PuppetStub = _
33+
protected var channel : ManagedChannel = _
2934

3035
protected def startGrpc(endpoint: String): Unit = {
3136
initChannel(endpoint)
@@ -96,9 +101,9 @@ trait GrpcSupport {
96101
}
97102

98103
private def startStream() {
99-
this.eventStream = PuppetGrpc.newStub(channel)
104+
this.asyncGrpcClient = PuppetGrpc.newStub(channel)
100105
val startRequest = EventRequest.newBuilder().build()
101-
this.eventStream.event(startRequest, this)
106+
this.asyncGrpcClient.event(startRequest, this)
102107
}
103108

104109
protected def stopGrpc(): Unit = {
@@ -120,7 +125,7 @@ trait GrpcSupport {
120125
private def stopStream(): Unit = {
121126
try {
122127
val countDownLatch = new CountDownLatch(1)
123-
this.eventStream.stop(Base.StopRequest.getDefaultInstance, new StreamObserver[Base.StopResponse] {
128+
this.asyncGrpcClient.stop(Base.StopRequest.getDefaultInstance, new StreamObserver[Base.StopResponse] {
124129
override def onNext(v: Base.StopResponse): Unit = {}
125130

126131
override def onError(throwable: Throwable): Unit = {}
@@ -133,4 +138,24 @@ trait GrpcSupport {
133138
logger.warn("fail to stop stream {}", e.getMessage)
134139
}
135140
}
141+
142+
type ClientCall[ReqT,RespT]=(ReqT,StreamObserver[RespT])=>Unit
143+
type ClientCallback[RespT,T]=RespT => T
144+
protected def asyncCall[ReqT,RespT](call: ClientCall[ReqT, RespT], req: ReqT): Unit = {
145+
asyncCallback(call,req)(resp=> resp)
146+
}
147+
protected def asyncCallback[ReqT,RespT,T](call: ClientCall[ReqT, RespT], req: ReqT)(callback:ClientCallback[RespT,T]):Future[T]= {
148+
val promise = Promise[T]
149+
call(req,new StreamObserver[RespT] {
150+
override def onNext(value: RespT): Unit = {
151+
val result = callback(value)
152+
promise.success(result)
153+
}
154+
override def onError(t: Throwable): Unit = promise.failure(t)
155+
override def onCompleted(): Unit = {
156+
if(!promise.isCompleted) promise.failure(new IllegalStateException("server completed"))
157+
}
158+
})
159+
promise.future
160+
}
136161
}

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/ContactRawSupport.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,17 @@ trait ContactRawSupport {
5050
/**
5151
* contact
5252
*/
53-
override protected def contactRawPayload(contactId: String): Contact.ContactPayload = {
54-
this.getContact(contactId)
53+
override protected def contactRawPayload(contactId: String): Future[Contact.ContactPayload] = {
54+
this.getContact(contactId).map(convertPadplusContactToContactPayload)
5555
}
5656

57-
protected def getContact(contactId: String): PadplusContactPayload = {
57+
protected def getContact(contactId: String): Future[PadplusContactPayload] = {
5858
getPadplusContactPayload(contactId) match{
59-
case Some(padplusContactPayload) => padplusContactPayload
59+
case Some(padplusContactPayload) => Future.successful(padplusContactPayload)
6060
case _ =>
6161
val json = objectMapper.createObjectNode()
6262
json.put("userName", contactId)
63-
syncRequest[GrpcContactPayload](ApiType.GET_CONTACT,Some(json.toString))
63+
asyncRequest[GrpcContactPayload](ApiType.GET_CONTACT,Some(json.toString)).map(convertFromGrpcContact)
6464
}
6565
}
6666

wechaty-puppet-padplus/src/test/scala/wechaty/padplus/support/ContactRawSupportTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ContactRawSupportTest extends PadplusTestEventBase{
3535
val payload = Await.result(future,10 seconds)
3636
Assertions.assertEquals("jcai",payload.id)
3737
awaitEventCompletion()
38-
38+
3939
*/
4040
}
4141
}

wechaty-puppet/src/main/scala/wechaty/puppet/schemas/Puppet.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,23 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
77
import io.github.wechaty.grpc.puppet.Event.EventType
88
import io.grpc.ManagedChannel
99

10+
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
11+
1012
/**
1113
*
1214
* @author <a href="mailto:[email protected]">Jun Tsai</a>
1315
* @since 2020-06-01
1416
*/
1517
object Puppet {
18+
implicit lazy val executionContext: ExecutionContextExecutor = ExecutionContext.global
1619
class PuppetOptions {
1720
var endPoint: Option[String] = None
1821
var timeout: Option[Long] = None
1922
var token: Option[String] = None
2023
var channelOpt:Option[ManagedChannel] = None
2124
var puppetOptionKey: Option[String] = None
2225
}
23-
lazy val objectMapper = {
26+
lazy val objectMapper: ObjectMapper = {
2427
val om = new ObjectMapper()
2528
import com.fasterxml.jackson.databind.DeserializationFeature
2629
om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

wechaty-puppet/src/main/scala/wechaty/puppet/support/ContactSupport.scala

+12-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import com.typesafe.scalalogging.LazyLogging
55
import wechaty.puppet.ResourceBox
66
import wechaty.puppet.schemas.Contact.ContactPayload
77
import wechaty.puppet.schemas.Puppet
8+
import wechaty.puppet.schemas.Puppet.executionContext
9+
10+
import scala.concurrent.Future
811

912
/**
1013
*
@@ -29,7 +32,7 @@ trait ContactSupport {
2932

3033
def contactList(): Array[String]
3134

32-
def contactPayload(contactId: String): ContactPayload = {
35+
def contactPayload(contactId: String): Future[ContactPayload] = {
3336
if (Puppet.isBlank(contactId)) {
3437
throw new IllegalArgumentException("contact id is blank!")
3538
}
@@ -39,18 +42,19 @@ trait ContactSupport {
3942
*/
4043
val cachedPayload = this.cacheContactPayload.getIfPresent(contactId)
4144
if (cachedPayload != null) {
42-
return cachedPayload
45+
return Future.successful(cachedPayload)
4346
}
4447

4548
/**
4649
* 2. Cache not found
4750
*/
48-
val payload = this.contactRawPayload(contactId)
49-
50-
cacheContactPayload.put(contactId, payload)
51-
logger.info("Puppet contactPayload({}) cache SET", contactId)
51+
val payloadFuture = this.contactRawPayload(contactId)
5252

53-
payload
53+
payloadFuture.map(payload=>{
54+
cacheContactPayload.put(contactId, payload)
55+
logger.info("Puppet contactPayload({}) cache SET", contactId)
56+
payload
57+
})
5458
}
5559
def contactPayloadDirty (contactId: String): Unit ={
5660
logger.debug("Puppet contactPayloadDirty({})", contactId)
@@ -60,6 +64,6 @@ trait ContactSupport {
6064
/**
6165
* contact
6266
*/
63-
protected def contactRawPayload(contactId: String): ContactPayload
67+
protected def contactRawPayload(contactId: String): Future[ContactPayload]
6468

6569
}

wechaty/src/main/scala/wechaty/user/Contact.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import wechaty.puppet.ResourceBox
66
import wechaty.puppet.schemas.Contact.{ContactGender, ContactPayload, ContactType}
77
import wechaty.puppet.schemas.Puppet
88

9+
import scala.concurrent.Await
10+
import scala.concurrent.duration._
911
import scala.language.implicitConversions
1012

1113
/**
@@ -21,7 +23,9 @@ import scala.language.implicitConversions
2123
class Contact(contactId: String)(implicit resolver: PuppetResolver) extends Conversation(contactId) with LazyLogging {
2224
// lazy val payload: schemas.Contact.ContactPayload = resolver.puppet.contactPayload(contactId)
2325
def payload: ContactPayload = {
24-
resolver.puppet.contactPayload(contactId)
26+
//TODO use async
27+
val f = resolver.puppet.contactPayload(contactId)
28+
Await.result(f,10 seconds)
2529
}
2630

2731

0 commit comments

Comments
 (0)