Skip to content

Commit ed7db4c

Browse files
author
Jayesh
committed
merged conflicts
2 parents 398e9a8 + d3a6e15 commit ed7db4c

File tree

3 files changed

+121
-39
lines changed

3 files changed

+121
-39
lines changed

docs/manual/database/live-query.md

+86-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Listening to real-time`db.liveQuery
1+
# Listening to real-time `db.liveQuery`
22
You can listen / subscribe to changes happening in your app's data in real time by simply calling `db.liveQuery` on the frontend. Here's a code snippet to do this:
33

44
<div class="row tabs-wrapper">
@@ -25,8 +25,8 @@ const db = api.Mongo();
2525
const condition = cond("category", "==", 'frontend');
2626

2727
// Callback for data changes:
28-
const onSnapshot = (docs, type) => {
29-
console.log(docs, snapshot)
28+
const onSnapshot = (docs, type, changedDoc) => {
29+
console.log(docs, snapshot, changedDoc)
3030
}
3131

3232
// Callback for error while subscribing
@@ -51,14 +51,19 @@ API api = new API("books-app", "localhost", 8081);
5151
SQL db = api.MySQL();
5252
LiveQueryUnsubscribe unsubscribe = db.liveQuery("books").subscribe(new LiveDataListener() {
5353
@Override
54-
public void onSnapshot(LiveData data, String type) {
54+
public void onSnapshot(LiveData data, String type, ChangedData changedData) {
5555
System.out.println(type);
5656
for (Book book : data.getValue(Book.class)) {
5757
System.out.printf("ID:%d, Name:%s, Author:%s\n", book.getId(), book.getName(), book.getAuthor());
5858
}
59+
Book book = changedData.getValue(Book.class);
60+
if (book!=null) {
61+
System.out.println("CHANGED: ");
62+
System.out.printf("ID:%d, Name:%s, Author:%s\n", book.getId(), book.getName(), book.getAuthor());
63+
System.out.println();
64+
}
5965
System.out.println();
6066
}
61-
6267
@Override
6368
public void onError(String error) {
6469
System.out.println(error);
@@ -79,9 +84,10 @@ api = API('books-app', 'localhost:8081')
7984
db = api.my_sql()
8085

8186

82-
def on_snapshot(docs, kind):
87+
def on_snapshot(docs, kind, changedDoc):
8388
print("DOCS:", docs)
8489
print("KIND OF LIVE QUERY:", kind)
90+
print("CHANGED DOC:", changedDoc)
8591

8692

8793
def on_error(error):
@@ -111,15 +117,18 @@ func main() {
111117
fmt.Println(err)
112118
}
113119
db := api.MySQL()
114-
db.LiveQuery("books").Subscribe(func(liveData *model.LiveData, changeType string) () {
115-
fmt.Println(changeType)
120+
db.LiveQuery("books").Subscribe(func(liveData *model.LiveData, changeType string, changedData *model.ChangedData) () {
121+
fmt.Println("type", changeType)
116122
var v []interface{}
117123
liveData.Unmarshal(&v)
118-
fmt.Println(v)
124+
fmt.Println("data", v)
125+
var v2 interface{}
126+
changedData.Unmarshal(&v2)
127+
fmt.Println("chagned", v2)
128+
fmt.Println()
119129
}, func(err error) () {
120130
fmt.Println(err)
121131
})
122-
for {}
123132
}
124133
</code>
125134
</pre>
@@ -132,13 +141,79 @@ func main() {
132141
**docs:** An array of latest result set.
133142
**type:** Type of operation due to which the `onSnapshot` is called. It can have one of the following values:
134143
- **initial** - Called only once for the initial data on successful subscription
135-
- **write** - Whenever any data is added or updated
144+
- **insert** - Whenever any data is added
145+
- **update** - Whenever any data is updated
136146
- **delete** - Whenever any data is deleted
147+
**changedDoc:** The doc that changed.
137148

138149
`onError` function is called with the `error` if there was any error subscribing to data.
139150

140151
As you would have noticed the `subscribe` function returns an `unsubscribe` function. You should call this function whenever you want to unsubscribe to the changes.
141152

153+
## Setting the liveQuery options:
154+
You can set the liveQuery options using the `options()` function.
155+
The function helps to set `changesOnly` to true or false (default).
156+
If `changesOnly` is false, it caches the docs. `onSnapshot` will be called with all 3 parameters set.
157+
If `changesOnly` is true, it does not cache the docs and also ignores the initial values. `onSnapshot` will be called with only the last 2 parameters set.
158+
159+
Here's a code snippet to do this:
160+
161+
<div class="row tabs-wrapper">
162+
<div class="col s12" style="padding:0">
163+
<ul class="tabs">
164+
<li class="tab col s2"><a class="active" href="#live-query-options-js">Javascript</a></li>
165+
<li class="tab col s2"><a href="#live-query-options-java">Java</a></li>
166+
<li class="tab col s2"><a href="#live-query-options-python">Python</a></li>
167+
<li class="tab col s2"><a href="#live-query-options-golang">Golang</a></li>
168+
</ul>
169+
</div>
170+
<div id="live-query-options-js" class="col s12" style="padding:0">
171+
<pre>
172+
<code>
173+
let unsubscribe = db.liveQuery('posts').where({}).options({ changesOnly: true })subscribe(onSnapshot, onError)
174+
</code>
175+
</pre>
176+
</div>
177+
<div id="live-query-options-java" class="col s12" style="padding:0">
178+
<pre>
179+
<code class="java">
180+
LiveQueryUnsubscribe unsubscribe = db.liveQuery("books")
181+
.options(LiveQueryOptions.Builder().setChangesOnly(true)).subscribe(new LiveDataListener() {
182+
@Override
183+
public void onSnapshot(LiveData data, String type, ChangedData changedData) {
184+
// ...
185+
}
186+
@Override
187+
public void onError(String error) {
188+
// ...
189+
}
190+
});
191+
</code>
192+
</pre>
193+
</div>
194+
<div id="live-query-options-python" class="col s12" style="padding:0">
195+
<pre>
196+
<code class="python">
197+
unsubscribe = db.live_query('books').options(changes_only=True).subscribe(on_snapshot, on_error)
198+
</code>
199+
</pre>
200+
</div>
201+
<div id="live-query-options-golang" class="col s12" style="padding:0">
202+
<pre>
203+
<code class="golang">
204+
db.LiveQuery("books").Options(&model.LiveQueryOptions{ChangesOnly: false}).
205+
Subscribe(func(liveData *model.LiveData, changeType string, changedData *model.ChangedData) () {
206+
// ...
207+
}, func(err error) () {
208+
// ...
209+
})
210+
}
211+
</code>
212+
</pre>
213+
</div>
214+
</div>
215+
216+
142217
## Next steps
143218

144219
Now you know how to subscribe to realtime changes in data. The next step would be to have a look at how to update data from your frontend.

utils/handlers/crud.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -364,8 +364,8 @@ func HandleCrudBatch(isProd bool, projects *projects.Projects) http.HandlerFunc
364364
// Send error response
365365
if err != nil {
366366
// Send realtime nack
367-
for _, m := range msgIDs {
368-
state.Realtime.SendAck(m.id, meta.project, m.col, false)
367+
for j := 0; j < i; j++ {
368+
state.Realtime.SendAck(msgIDs[j].id, meta.project, msgIDs[j].col, false)
369369
}
370370

371371
// Send http response

utils/server/grpc.go

+33-26
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,10 @@ func (s *Server) Batch(ctx context.Context, in *pb.BatchRequest) (*pb.Response,
250250

251251
allRequests := []model.AllRequest{}
252252
for i, req := range in.Batchrequest {
253+
// Make status and error variables
254+
var status int
255+
var err error
256+
253257
switch req.Type {
254258
case string(utils.Create):
255259
eachReq := model.AllRequest{}
@@ -258,8 +262,8 @@ func (s *Server) Batch(ctx context.Context, in *pb.BatchRequest) (*pb.Response,
258262

259263
r := model.CreateRequest{}
260264
var temp interface{}
261-
if err := json.Unmarshal(req.Document, &temp); err != nil {
262-
return &pb.Response{Status: http.StatusInternalServerError, Error: err.Error()}, nil
265+
if err = json.Unmarshal(req.Document, &temp); err != nil {
266+
status = http.StatusInternalServerError
263267
}
264268
r.Document = temp
265269
eachReq.Document = temp
@@ -270,15 +274,13 @@ func (s *Server) Batch(ctx context.Context, in *pb.BatchRequest) (*pb.Response,
270274
allRequests = append(allRequests, eachReq)
271275

272276
// Check if the user is authenticated
273-
status, err := state.Auth.IsCreateOpAuthorised(in.Meta.Project, in.Meta.DbType, req.Col, in.Meta.Token, &r)
274-
if err != nil {
275-
return &pb.Response{Status: int32(status), Error: err.Error()}, nil
277+
status, err = state.Auth.IsCreateOpAuthorised(in.Meta.Project, in.Meta.DbType, req.Col, in.Meta.Token, &r)
278+
if err == nil {
279+
// Send realtime message intent
280+
msgID := state.Realtime.SendCreateIntent(in.Meta.Project, in.Meta.DbType, req.Col, &r)
281+
msgIDs[i] = &msg{id: msgID, col: req.Col}
276282
}
277283

278-
// Send realtime message intent
279-
msgID := state.Realtime.SendCreateIntent(in.Meta.Project, in.Meta.DbType, req.Col, &r)
280-
msgIDs[i] = &msg{id: msgID, col: req.Col}
281-
282284
case string(utils.Update):
283285
eachReq := model.AllRequest{}
284286
eachReq.Type = req.Type
@@ -293,8 +295,8 @@ func (s *Server) Batch(ctx context.Context, in *pb.BatchRequest) (*pb.Response,
293295
eachReq.Find = temp
294296

295297
temp = map[string]interface{}{}
296-
if err := json.Unmarshal(req.Update, &temp); err != nil {
297-
return &pb.Response{Status: http.StatusInternalServerError, Error: err.Error()}, nil
298+
if err = json.Unmarshal(req.Update, &temp); err != nil {
299+
status = http.StatusInternalServerError
298300
}
299301
r.Update = temp
300302
eachReq.Update = temp
@@ -305,24 +307,22 @@ func (s *Server) Batch(ctx context.Context, in *pb.BatchRequest) (*pb.Response,
305307
allRequests = append(allRequests, eachReq)
306308

307309
// Check if the user is authenticated
308-
status, err := state.Auth.IsUpdateOpAuthorised(in.Meta.Project, in.Meta.DbType, req.Col, in.Meta.Token, &r)
309-
if err != nil {
310-
return &pb.Response{Status: int32(status), Error: err.Error()}, nil
310+
status, err = state.Auth.IsUpdateOpAuthorised(in.Meta.Project, in.Meta.DbType, req.Col, in.Meta.Token, &r)
311+
if err == nil {
312+
// Send realtime message intent
313+
msgID := state.Realtime.SendUpdateIntent(in.Meta.Project, in.Meta.DbType, req.Col, &r)
314+
msgIDs[i] = &msg{id: msgID, col: req.Col}
311315
}
312316

313-
// Send realtime message intent
314-
msgID := state.Realtime.SendUpdateIntent(in.Meta.Project, in.Meta.DbType, req.Col, &r)
315-
msgIDs[i] = &msg{id: msgID, col: req.Col}
316-
317317
case string(utils.Delete):
318318
eachReq := model.AllRequest{}
319319
eachReq.Type = req.Type
320320
eachReq.Col = req.Col
321321

322322
r := model.DeleteRequest{}
323323
temp := map[string]interface{}{}
324-
if err := json.Unmarshal(req.Find, &temp); err != nil {
325-
return &pb.Response{Status: 500, Error: err.Error()}, nil
324+
if err = json.Unmarshal(req.Find, &temp); err != nil {
325+
status = http.StatusInternalServerError
326326
}
327327
r.Find = temp
328328
eachReq.Find = temp
@@ -333,16 +333,23 @@ func (s *Server) Batch(ctx context.Context, in *pb.BatchRequest) (*pb.Response,
333333
allRequests = append(allRequests, eachReq)
334334

335335
// Check if the user is authenticated
336-
status, err := state.Auth.IsDeleteOpAuthorised(in.Meta.Project, in.Meta.DbType, req.Col, in.Meta.Token, &r)
337-
if err != nil {
338-
return &pb.Response{Status: int32(status), Error: err.Error()}, nil
336+
status, err = state.Auth.IsDeleteOpAuthorised(in.Meta.Project, in.Meta.DbType, req.Col, in.Meta.Token, &r)
337+
if err == nil {
338+
// Send realtime message intent
339+
msgID := state.Realtime.SendDeleteIntent(in.Meta.Project, in.Meta.DbType, req.Col, &r)
340+
msgIDs[i] = &msg{id: msgID, col: req.Col}
339341
}
342+
}
340343

341-
// Send realtime message intent
342-
msgID := state.Realtime.SendDeleteIntent(in.Meta.Project, in.Meta.DbType, req.Col, &r)
343-
msgIDs[i] = &msg{id: msgID, col: req.Col}
344+
// Send negative acks and send error response
345+
for j := 0; j < i; j++ {
346+
state.Realtime.SendAck(msgIDs[j].id, in.Meta.Project, msgIDs[j].col, false)
344347
}
348+
349+
// Send gRPC Response
350+
return &pb.Response{Status: int32(status), Error: err.Error()}, nil
345351
}
352+
346353
// Perform the Batch operation
347354
batch := model.BatchRequest{}
348355
batch.Requests = allRequests

0 commit comments

Comments
 (0)