Skip to content

Commit be7356b

Browse files
committed
#21 #25 optimize async call method
1 parent a6fe89c commit be7356b

File tree

9 files changed

+154
-107
lines changed

9 files changed

+154
-107
lines changed

wechaty-puppet-padplus/src/main/resources/log4j.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
88
# A1 uses PatternLayout.
99
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
1010
log4j.appender.A1.layout.ConversionPattern=[%t %p]%d{HH:mm:ss} %c{2} %m%n
11-
log4j.category.wechaty=debug
11+
log4j.category.wechaty=trace

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/PuppetPadplus.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ class PuppetPadplus(val option:PuppetOptions,val storePath:String="/tmp/padplus"
4646
case Some(str) =>
4747
uinOpt = Some(str)
4848
logger.debug("found uin in local store:{}",str)
49-
asyncRequest(ApiType.INIT)
49+
asyncRequestNothing(ApiType.INIT)
5050
case _ =>
51-
asyncRequest(ApiType.GET_QRCODE)
51+
asyncRequestNothing(ApiType.GET_QRCODE)
5252
}
5353
}
5454
def stop(): Unit = {

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/CallbackHelper.scala

+45-38
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ import java.util.concurrent.TimeUnit
44

55
import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
66
import wechaty.padplus.grpc.PadPlusServerOuterClass.StreamResponse
7-
import wechaty.padplus.schemas.ModelContact.{PadplusContactPayload, PadplusConversation}
7+
import wechaty.padplus.schemas.ModelContact.PadplusContactPayload
88
import wechaty.padplus.schemas.ModelRoom.{PadplusRoomMemberMap, PadplusRoomPayload}
99

10+
import scala.concurrent.Promise
11+
import scala.util.{Success, Try}
12+
1013
/**
1114
*
1215
* @author <a href="mailto:[email protected]">Jun Tsai</a>
@@ -19,23 +22,25 @@ object CallbackHelper {
1922
.build()
2023
.asInstanceOf[Cache[String, T]]
2124
}
22-
type TraceRequestCallback = StreamResponse => Unit
25+
type TraceRequestCallback = Promise[StreamResponse]
2326
lazy val traceCallbacks: Cache[String, TraceRequestCallback] = createPool[TraceRequestCallback]
2427

25-
type ContactCallback = PadplusConversation => Unit
26-
lazy val contactCallbacks: Cache[String, List[ContactCallback]] = createPool[List[ContactCallback]]
28+
type ContactPromise= Promise[PadplusContactPayload]
29+
lazy val contactCallbacks: Cache[String, List[ContactPromise]] = createPool[List[ContactPromise]]
30+
type RoomPromise= Promise[PadplusRoomPayload]
31+
lazy val roomCallbacks: Cache[String, List[RoomPromise]] = createPool[List[RoomPromise]]
2732

28-
type ContactAliasCallback = () => Unit
33+
type ContactAliasCallback = Promise[Unit]
2934
lazy val contactAliasCallbacks: Cache[String, Map[String, ContactAliasCallback]] = createPool[Map[String,ContactAliasCallback]]
3035

3136

32-
type RoomTopicCallback= ()=>Unit
37+
type RoomTopicCallback= Promise[Unit]
3338
lazy val roomTopicCallbacks: Cache[String, Map[String, RoomTopicCallback]] = createPool[Map[String,RoomTopicCallback]]
3439

35-
type AcceptFriendCallback= () =>Unit
40+
type AcceptFriendCallback= Promise[Unit]
3641
lazy val acceptFriendCallbacks: Cache[String, List[AcceptFriendCallback]] = createPool[List[AcceptFriendCallback]]
3742

38-
type RoomMemberCallback= PadplusRoomMemberMap =>Unit
43+
type RoomMemberCallback= Promise[PadplusRoomMemberMap]
3944
lazy val roomMemberCallbacks: Cache[String, List[RoomMemberCallback]] = createPool[List[RoomMemberCallback]]
4045

4146
def pushCallbackToPool (traceId: String, callback: TraceRequestCallback){
@@ -44,7 +49,7 @@ object CallbackHelper {
4449
def resolveCallBack (traceId: String,response:StreamResponse):Unit={
4550
val callback = traceCallbacks.getIfPresent(traceId)
4651
if(callback!=null){
47-
callback(response)
52+
callback.success(response)
4853
}
4954
traceCallbacks.invalidate(traceId)
5055
}
@@ -61,32 +66,42 @@ object CallbackHelper {
6166
cache.put(key,callbackList :+ callback)
6267
}
6368
}
69+
private def resolveListCallBack[T] (cache:Cache[String,List[Promise[T]]],key: String, data: Try[T]) {
70+
val callbackList = cache.getIfPresent(key)
71+
if (callbackList != null) {
72+
callbackList.foreach(f => f.complete(data))
73+
cache.invalidate(key)
74+
}
75+
}
6476

65-
def pushContactCallback ( contactId: String, callback: ContactCallback) {
77+
def pushContactCallback ( contactId: String, callback:ContactPromise) {
6678
pushListCallback(contactCallbacks,contactId,callback)
6779
}
6880

69-
def resolveContactCallBack (contactId: String, data: PadplusContactPayload) {
70-
val callbackList = contactCallbacks.getIfPresent(contactId)
71-
if(callbackList != null){
72-
callbackList.foreach(f=>f(data))
73-
}
81+
def resolveContactCallBack (contactId: String, data: Try[PadplusContactPayload]) {
82+
resolveListCallBack(contactCallbacks,contactId,data)
7483

75-
this.resolveContactAliasCallback(contactId, data.remark)
76-
this.resolveAcceptFriendCallback(contactId)
77-
contactCallbacks.invalidate(contactId)
84+
data match{
85+
case Success(value) =>
86+
this.resolveContactAliasCallback(contactId, value.remark)
87+
this.resolveAcceptFriendCallback(contactId)
88+
case _ =>
89+
}
7890
}
7991

80-
def resolveRoomCallBack (roomId: String, data: PadplusRoomPayload) {
81-
val callbackList = contactCallbacks.getIfPresent(roomId)
82-
if(callbackList != null){
83-
callbackList.foreach(f=>f(data))
92+
def pushRoomCallback ( roomId: String, callback:RoomPromise) {
93+
pushListCallback(roomCallbacks,roomId,callback)
94+
}
95+
def resolveRoomCallBack (roomId: String, data: Try[PadplusRoomPayload]) {
96+
resolveListCallBack(roomCallbacks,roomId,data)
97+
data match{
98+
case Success(value) =>
99+
this.resolveRoomTopicCallback(value.chatroomId, value.nickName)
100+
case _ => //do nothing
84101
}
85-
this.resolveRoomTopicCallback(data.chatroomId, data.nickName)
86-
contactCallbacks.invalidate(roomId)
87102
}
88103

89-
private def pushMapCallback[T](cache:Cache[String,Map[String,T]],key:String,mapKey:String,callback:T): Unit ={
104+
private def pushMapCallback[T](cache:Cache[String,Map[String,Promise[T]]],key:String,mapKey:String,callback:Promise[T]): Unit ={
90105
val callbacks = cache.getIfPresent(key)
91106
if(callbacks == null){
92107
cache.put(key,Map(mapKey->callback))
@@ -97,12 +112,12 @@ object CallbackHelper {
97112
def pushContactAliasCallback (contactId: String, alias: String, callback:ContactAliasCallback): Unit = {
98113
pushMapCallback(contactAliasCallbacks,contactId,alias,callback)
99114
}
100-
private def resolveMapCallback(cache:Cache[String,Map[String,()=>Unit]],key: String, mapKey: String) {
115+
private def resolveMapCallback(cache:Cache[String,Map[String,Promise[Unit]]],key: String, mapKey: String) {
101116
val callbacks = cache.getIfPresent(key)
102117
if(callbacks != null){
103118
val callbackOpt = callbacks.get(mapKey)
104119
callbackOpt.foreach{f=>
105-
f()
120+
f.success({})
106121
if(callbacks.size == 1)
107122
cache.invalidate(key)
108123
else
@@ -127,22 +142,14 @@ object CallbackHelper {
127142
}
128143

129144
private def resolveAcceptFriendCallback (contactId: String) {
130-
val callbacks = acceptFriendCallbacks.getIfPresent(contactId)
131-
if(callbacks != null){
132-
callbacks.foreach(cb=>cb())
133-
acceptFriendCallbacks.invalidate(contactId)
134-
}
145+
resolveListCallBack(acceptFriendCallbacks,contactId,Try[Unit]({}))
135146
}
136147

137148
def pushRoomMemberCallback (roomId: String, callback: RoomMemberCallback): Unit ={
138149
pushListCallback(roomMemberCallbacks,roomId,callback)
139150
}
140151

141-
def resolveRoomMemberCallback (roomId: String, memberList: PadplusRoomMemberMap) {
142-
val callbacks = roomMemberCallbacks.getIfPresent(roomId)
143-
if(callbacks != null){
144-
callbacks.foreach(cb=>cb(memberList))
145-
roomMemberCallbacks.invalidate(roomId)
146-
}
152+
def resolveRoomMemberCallback (roomId: String, memberList: Try[PadplusRoomMemberMap]) {
153+
resolveListCallBack(roomMemberCallbacks,roomId,memberList)
147154
}
148155
}

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/ContactRawSupport.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import wechaty.puppet.schemas.Event.{EventLoginPayload, EventScanPayload}
2121
import wechaty.puppet.schemas.Puppet._
2222
import wechaty.puppet.schemas.{Contact, Puppet}
2323

24-
import scala.concurrent.Future
24+
import scala.concurrent.{Future, Promise}
2525
import scala.language.implicitConversions
2626
import scala.util.Try
2727

@@ -54,9 +54,11 @@ trait ContactRawSupport extends LazyLogging{
5454
getPadplusContactPayload(contactId) match{
5555
case Some(padplusContactPayload) => Future.successful(padplusContactPayload)
5656
case _ =>
57+
val promise = Promise[PadplusContactPayload]
58+
CallbackHelper.pushContactCallback(contactId,promise)
5759
val json = objectMapper.createObjectNode()
5860
json.put("userName", contactId)
59-
asyncRequest[GrpcContactPayload](ApiType.GET_CONTACT,Some(json.toString)).map(convertFromGrpcContact)
61+
asyncRequestNothing(ApiType.GET_CONTACT,Some(json.toString)).flatMap(_=>promise.future)
6062
}
6163
}
6264

@@ -138,6 +140,7 @@ trait ContactRawSupport extends LazyLogging{
138140
if(!isBlank(data)){
139141
val root = objectMapper.readTree(data)
140142
val userName = root.get("UserName").asText()
143+
logger.trace("response data:{}",data)
141144
if(isRoomId(userName)){
142145
val roomTry=Try {
143146
val grpcRoomPayload = objectMapper.readValue(data, classOf[GrpcRoomPayload])
@@ -157,9 +160,7 @@ trait ContactRawSupport extends LazyLogging{
157160

158161
roomPayload
159162
}
160-
//TODO use Try instance to avoid memory leak
161-
if(roomTry.isSuccess)
162-
CallbackHelper.resolveRoomCallBack(userName,roomTry.get)
163+
CallbackHelper.resolveRoomCallBack(userName,roomTry)
163164

164165
}else{
165166
val result:Try[PadplusContactPayload] = Try {
@@ -169,9 +170,7 @@ trait ContactRawSupport extends LazyLogging{
169170
padplusContactPayload
170171
}
171172

172-
//TODO use Try instance to avoid memory leak
173-
if(result.isSuccess)
174-
CallbackHelper.resolveContactCallBack(userName,result.get)
173+
CallbackHelper.resolveContactCallBack(userName,result)
175174
}
176175
}
177176
}

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/GrpcSupport.scala

+50-41
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,38 @@ trait GrpcSupport {
126126
protected def generateTraceId(apiType:ApiType): String={
127127
UUID.randomUUID().toString
128128
}
129+
//can't create Promise[Nothing] instance,so use the method create Future[Unit]
130+
protected def asyncRequestNothing(apiType: ApiType,data:Option[Any]=None): Future[Unit] ={
131+
val request = RequestObject.newBuilder()
132+
request.setToken(option.token.get)
133+
uinOpt match{
134+
case Some(id) =>
135+
request.setUin(id)
136+
case _ =>
137+
}
138+
request.setApiType(apiType)
139+
data match{
140+
case Some(str:String) =>
141+
request.setParams(str)
142+
case Some(d) =>
143+
request.setParams(Puppet.objectMapper.writeValueAsString(d))
144+
case _ =>
145+
}
146+
147+
val future = asyncCall(PadPlusServerGrpc.getRequestMethod,request.build())
148+
future.map{rep=>
149+
if(rep.getResult != "success"){
150+
logger.warn("fail to request {}",rep)
151+
throw new IllegalAccessException("fail to request ,grpc result:"+rep)
152+
}
153+
}
154+
}
129155
protected def asyncRequest[T : TypeTag ](apiType: ApiType,data:Option[Any]=None)(implicit classTag: ClassTag[T]): Future[T] ={
156+
typeOf[T] match {
157+
case t if t =:= typeOf[Nothing] =>
158+
throw new IllegalAccessException("generic type is nothing,maybe you should use asyncRequestNothing !")
159+
case _ =>
160+
}
130161
val request = RequestObject.newBuilder()
131162
request.setToken(option.token.get)
132163
uinOpt match{
@@ -147,53 +178,31 @@ trait GrpcSupport {
147178
val traceId= generateTraceId(apiType)
148179
request.setTraceId(traceId)
149180
logger.debug("request:{}",request.build())
150-
val p = Promise[T]()
151-
152-
val callbackDelegate:TraceRequestCallback=(streamResponse:StreamResponse)=>{
153-
if(p.isCompleted){
154-
logger.warn("promise is completed ,{}",p)
155-
}else {
156-
p.complete(Try {
157-
typeOf[T] match{
158-
case t if t =:= typeOf[Nothing] =>
159-
null.asInstanceOf[T]
160-
case t if t =:= typeOf[JsonNode] =>
161-
Puppet.objectMapper.readTree(streamResponse.getData).asInstanceOf[T]
162-
case _ =>
163-
try {
164-
Puppet.objectMapper.readValue(streamResponse.getData, classTag.runtimeClass).asInstanceOf[T]
165-
}catch{
166-
case e:Throwable =>
167-
logger.error(e.getMessage,e)
168-
throw e
169-
}
170-
}
171-
})
172-
}
173-
}
174-
//过滤不需要返回
175-
typeOf[T] match{
176-
case t if t =:= typeOf[Nothing] =>
177-
case _ => CallbackHelper.pushCallbackToPool(traceId,callbackDelegate)
178-
}
181+
182+
val callbackPromise = Promise[StreamResponse]
183+
CallbackHelper.pushCallbackToPool(traceId,callbackPromise)
179184
val future = asyncCall(PadPlusServerGrpc.getRequestMethod,request.build())
180185
future.flatMap{rep=>
181186
if(rep.getResult != "success"){
182-
p.failure(new IllegalAccessException("fail to request ,grpc result:"+rep))
187+
callbackPromise.failure(new IllegalAccessException("fail to request ,grpc result:"+rep))
188+
}
189+
callbackPromise.future
190+
}.map { streamResponse =>
191+
typeOf[T] match {
192+
case t if t =:= typeOf[JsonNode] =>
193+
Puppet.objectMapper.readTree(streamResponse.getData).asInstanceOf[T]
194+
case _ =>
195+
try {
196+
Puppet.objectMapper.readValue(streamResponse.getData, classTag.runtimeClass).asInstanceOf[T]
197+
} catch {
198+
case e: Throwable =>
199+
logger.error(e.getMessage, e)
200+
throw e
201+
}
183202
}
184-
p.future
185203
}
186-
// val response = grpcClient.request(request.build())
187-
// logger.debug(s"request $apiType response $response")
188-
//
189-
// if(response.getResult != "success"){
190-
// //fail?
191-
// logger.error("fail to request grpc,response {}",response)
192-
// p.failure(new IllegalAccessException("fail to request ,grpc result:"+response))
193-
// }
194-
//
195-
// p.future
196204
}
205+
197206
type ClientCallback[RespT,T]=RespT => T
198207
protected def asyncCall[ReqT,RespT](call: MethodDescriptor[ReqT, RespT], req: ReqT): Future[RespT]= {
199208
asyncCallback(call,req)(resp => resp)

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/LocalStoreSupport.scala

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package wechaty.padplus.support
22

3+
import java.io.File
4+
35
import com.google.protobuf.ByteString
6+
import org.apache.commons.io.FileUtils
47
import wechaty.padplus.PuppetPadplus
58
import wechaty.padplus.internal.LocalStore
69
import wechaty.padplus.schemas.ModelContact.PadplusContactPayload
@@ -69,4 +72,9 @@ trait LocalStoreSupport {
6972
if(store != null)
7073
store.close()
7174
}
75+
private[wechaty] def resetLocalStore(): Unit ={
76+
stopLocalStore()
77+
FileUtils.deleteDirectory(new File(storePath))
78+
startLocalStore()
79+
}
7280
}

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/RoomMemberRawSupport.scala

+14-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import wechaty.puppet.schemas.Room
88
import wechaty.puppet.schemas.Room.RoomMemberPayload
99
import wechaty.puppet.support.RoomMemberSupport
1010

11+
import scala.util.Try
12+
1113
/**
1214
*
1315
* @author <a href="mailto:[email protected]">Jun Tsai</a>
@@ -63,16 +65,19 @@ trait RoomMemberRawSupport {
6365
}
6466
protected def roomMemberPartialFunction(response:StreamResponse):PartialFunction[ResponseType,Unit]={
6567
case ResponseType.ROOM_MEMBER_LIST =>
66-
val roomMemberList= objectMapper.readValue(response.getData,classOf[GrpcRoomMemberList])
67-
val roomId = roomMemberList.roomId
68-
val membersStr = roomMemberList.membersJson
69-
val membersList =objectMapper.readValue(membersStr,classOf[Array[GrpcRoomMemberPayload]])
70-
val data=membersList.map(x=> x.UserName-> convertToPadplusRoomMemberPayload(x)).toMap
71-
val padplusRoomMemberMap = new PadplusRoomMemberMap
72-
padplusRoomMemberMap.members = data
73-
savePadplusRoomMembers(roomId,padplusRoomMemberMap)
68+
val roomMemberList = objectMapper.readValue(response.getData, classOf[GrpcRoomMemberList])
69+
val roomId = roomMemberList.roomId
70+
val roomMembers = Try {
71+
val membersStr = roomMemberList.membersJson
72+
val membersList = objectMapper.readValue(membersStr, classOf[Array[GrpcRoomMemberPayload]])
73+
val data = membersList.map(x => x.UserName -> convertToPadplusRoomMemberPayload(x)).toMap
74+
val padplusRoomMemberMap = new PadplusRoomMemberMap
75+
padplusRoomMemberMap.members = data
76+
savePadplusRoomMembers(roomId, padplusRoomMemberMap)
77+
padplusRoomMemberMap
78+
}
7479

7580

76-
CallbackHelper.resolveRoomMemberCallback(roomId,padplusRoomMemberMap)
81+
CallbackHelper.resolveRoomMemberCallback(roomId,roomMembers)
7782
}
7883
}

0 commit comments

Comments
 (0)