11import { primordials , core } from "ext:core/mod.js" ;
22import { readableStreamForRid , writableStreamForRid } from "ext:deno_web/06_streams.js" ;
33import { getSupabaseTag } from "ext:sb_core_main_js/js/http.js" ;
4+ import { SymbolDispose } from "ext:deno_web/00_infra.js" ;
45
56const ops = core . ops ;
67
@@ -9,6 +10,8 @@ const { TypeError } = primordials;
910const {
1011 op_user_worker_fetch_send,
1112 op_user_worker_create,
13+ op_user_user_worker_wait_token_cancelled,
14+ op_user_worker_is_active,
1215} = ops ;
1316
1417const NO_SUPABASE_TAG_WARN_MSG = `Unable to find the supabase tag from the request instance.\n\
@@ -24,8 +27,33 @@ function redirectStatus(status) {
2427}
2528
2629class UserWorker {
27- constructor ( key ) {
28- this . key = key ;
30+ /** @type {string } */
31+ #key = "" ;
32+
33+ /** @type {number | null } */
34+ #rid = null ;
35+
36+ /** @type {boolean } */
37+ #disposed = false ;
38+
39+ /**
40+ * @param {string } key
41+ * @param {number } rid
42+ */
43+ constructor ( key , rid ) {
44+ this . #key = key ;
45+ this . #rid = rid ;
46+
47+ const self = this ;
48+
49+ setTimeout ( async ( ) => {
50+ try {
51+ await op_user_user_worker_wait_token_cancelled ( rid ) ;
52+ self . dispose ( ) ;
53+ } catch {
54+ // TODO(Nyannyacha): Link it with the tracing for telemetry.
55+ }
56+ } ) ;
2957 }
3058
3159 async fetch ( request , options = { } ) {
@@ -62,7 +90,7 @@ class UserWorker {
6290 }
6391
6492 const responsePromise = op_user_worker_fetch_send (
65- this . key ,
93+ this . # key,
6694 requestRid ,
6795 requestBodyRid ,
6896 tag . streamRid ,
@@ -75,6 +103,7 @@ class UserWorker {
75103 ] ) ;
76104
77105 if ( requestBodyPromiseResult . status === "rejected" ) {
106+ // TODO(Nyannyacha): Link it with the tracing for telemetry.
78107 // console.warn(requestBodyPromiseResult.reason);
79108 }
80109
@@ -114,6 +143,26 @@ class UserWorker {
114143 } ) ;
115144 }
116145
146+ /** @returns {boolean } */
147+ get active ( ) {
148+ if ( this . #disposed) {
149+ return false ;
150+ }
151+
152+ return op_user_worker_is_active ( this . #rid) ;
153+ }
154+
155+ dispose ( ) {
156+ if ( ! this . #disposed) {
157+ core . tryClose ( this . #rid) ;
158+ this . #disposed = true ;
159+ }
160+ }
161+
162+ [ SymbolDispose ] ( ) {
163+ this . dispose ( ) ;
164+ }
165+
117166 static async create ( opts ) {
118167 const readyOptions = {
119168 noModuleCache : false ,
@@ -136,9 +185,9 @@ class UserWorker {
136185 throw new TypeError ( "service path must be defined" ) ;
137186 }
138187
139- const key = await op_user_worker_create ( readyOptions ) ;
188+ const [ key , rid ] = await op_user_worker_create ( readyOptions ) ;
140189
141- return new UserWorker ( key ) ;
190+ return new UserWorker ( key , rid ) ;
142191 }
143192}
144193
0 commit comments