From 90a4b8f72236528c5069c36bad8d332bd3d62d44 Mon Sep 17 00:00:00 2001 From: Dmitry S Date: Sat, 7 Dec 2024 02:02:46 +0100 Subject: [PATCH 1/2] Support adding Python files specific to megagrist --- core | 2 +- ext/app/server/lib/MegaDataEngine.ts | 33 +++++++++++++++++++--------- ext/app/server/lib/create.ts | 4 +++- ext/sandbox/grist/ext/__init__.py | 0 ext/sandbox/grist/ext/depend.py | 14 ++++++++++++ 5 files changed, 41 insertions(+), 12 deletions(-) create mode 100644 ext/sandbox/grist/ext/__init__.py create mode 100644 ext/sandbox/grist/ext/depend.py diff --git a/core b/core index 1fecaa2..d2076c9 160000 --- a/core +++ b/core @@ -1 +1 @@ -Subproject commit 1fecaa2f8361a4188596e8175e101ac3eb2781f9 +Subproject commit d2076c99f93442c54bd4fbaa62c19f8eab7cb7a2 diff --git a/ext/app/server/lib/MegaDataEngine.ts b/ext/app/server/lib/MegaDataEngine.ts index bfbe18a..62b3cb0 100644 --- a/ext/app/server/lib/MegaDataEngine.ts +++ b/ext/app/server/lib/MegaDataEngine.ts @@ -12,13 +12,15 @@ import {WebSocketChannel} from 'app/megagrist/lib/WebSocketChannel'; import {DataEngineCallContext, DataEnginePooled} from 'app/megagrist/lib/DataEngine'; import {createDataEngineServer} from 'app/megagrist/lib/DataEngineServer'; import {QueryStreamingOptions} from 'app/megagrist/lib/IDataEngine'; -import {Query, QueryResultStreaming} from 'app/megagrist/lib/types'; +import {Query, QueryResult, QueryResultStreaming} from 'app/megagrist/lib/types'; import {ExpandedQuery} from 'app/megagrist/lib/sqlConstruct'; import {appSettings} from 'app/server/lib/AppSettings'; import {expandQuery} from 'app/server/lib/ExpandedQuery'; import * as ICreate from 'app/server/lib/ICreate'; import {OptDocSession} from 'app/server/lib/DocSession'; import * as log from 'app/server/lib/log'; +import * as path from 'path'; +import {realpathSync} from 'fs'; const enableMegaDataEngine = appSettings.section('dataEngine').flag('enableMega').readBool({ @@ -68,6 +70,10 @@ export class MegaDataEngine { public serve(channel: WebSocketChannel): void { createDataEngineServer(this._dataEngine, {channel, verbose: logDebug}); } + + public getSandboxExtDir(): string { + return path.join(realpathSync(path.join(process.cwd(), '..', 'ext', 'sandbox', 'grist'))); + } } export namespace MegaDataEngine { @@ -79,6 +85,12 @@ class UnmarshallingDataEngine extends DataEnginePooled { super(...args); } + public async fetchQuery(context: DataEngineCallContext, query: Query): Promise { + const queryResult = await super.fetchQuery(context, this._expandQuery(query)); + // TODO: apply decoders, like fetchQueryStreaming() does. + return queryResult; + } + public async fetchQueryStreaming( context: DataEngineCallContext, query: Query, options: QueryStreamingOptions ): Promise { @@ -90,15 +102,7 @@ class UnmarshallingDataEngine extends DataEnginePooled { // to decode values. const tableData = this._docData.getTable(query.tableId) - let expandedQuery: ExpandedQuery; - if (query.columns) { - expandedQuery = query; - } else { - const expanded = expandQuery({tableId: query.tableId, filters: {}}, this._docData, true); - expandedQuery = {...query, joins: expanded.joins, selects: expanded.selects}; - } - - const queryResult = await super.fetchQueryStreaming(context, expandedQuery, options); + const queryResult = await super.fetchQueryStreaming(context, this._expandQuery(query), options); const decoders = queryResult.value.colIds.map(c => getDecoder(tableData?.getColType(c))); async function *generateRows() { @@ -111,6 +115,15 @@ class UnmarshallingDataEngine extends DataEnginePooled { chunks: generateRows() }; } + + private _expandQuery(query: Query): ExpandedQuery { + if (query.columns) { + return query; + } else { + const expanded = expandQuery({tableId: query.tableId, filters: {}}, this._docData, true); + return {...query, joins: expanded.joins, selects: expanded.selects}; + } + } } type DecoderFunc = (val: CellValue) => CellValue; diff --git a/ext/app/server/lib/create.ts b/ext/app/server/lib/create.ts index 23dbf89..b88b9ce 100644 --- a/ext/app/server/lib/create.ts +++ b/ext/app/server/lib/create.ts @@ -1,9 +1,11 @@ import {ICreate} from "app/server/lib/ICreate"; import {makeCoreCreator} from "app/server/lib/coreCreator"; import {getSupportedEngineChoices} from 'app/server/lib/MegaDataEngine'; +import * as fs from 'fs'; export const create: ICreate = makeCoreCreator({ - getSupportedEngineChoices + getSupportedEngineChoices, + sandboxFlavor: (fs.existsSync('/usr/bin/sandbox-exec') ? 'macSandboxExec' : 'gvisor'), }); /** diff --git a/ext/sandbox/grist/ext/__init__.py b/ext/sandbox/grist/ext/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ext/sandbox/grist/ext/depend.py b/ext/sandbox/grist/ext/depend.py new file mode 100644 index 0000000..630ba62 --- /dev/null +++ b/ext/sandbox/grist/ext/depend.py @@ -0,0 +1,14 @@ +import depend_base +from depend_base import * + +import logging +log = logging.getLogger(__name__) +log.error("WEEE0") + +class Graph(depend_base.Graph): + """ + Represents the dependency graph for all data in a grist document. + """ + def __init__(self): + log.error("WEEE1") + super(Graph, self).__init__() From 8f9ecaeb1f720a47724b8c1106a2b7f881551873 Mon Sep 17 00:00:00 2001 From: Dmitry S Date: Thu, 12 Dec 2024 01:05:40 -0500 Subject: [PATCH 2/2] Experiment rewriting depend.py using SQLite in-memory table --- core | 2 +- ext/sandbox/grist/ext/depend.py | 170 +++++++++++++++++++++++++++++++- package.json | 3 +- 3 files changed, 170 insertions(+), 5 deletions(-) diff --git a/core b/core index d2076c9..e2ae1bb 160000 --- a/core +++ b/core @@ -1 +1 @@ -Subproject commit d2076c99f93442c54bd4fbaa62c19f8eab7cb7a2 +Subproject commit e2ae1bb4b091d829e576a2f0088d7029050581ff diff --git a/ext/sandbox/grist/ext/depend.py b/ext/sandbox/grist/ext/depend.py index 630ba62..fbb177f 100644 --- a/ext/sandbox/grist/ext/depend.py +++ b/ext/sandbox/grist/ext/depend.py @@ -1,14 +1,178 @@ import depend_base from depend_base import * +import sqlite3 import logging log = logging.getLogger(__name__) log.error("WEEE0") -class Graph(depend_base.Graph): +# +#Node1 +#Node2 +#Relation + +class Graph(object): """ Represents the dependency graph for all data in a grist document. """ def __init__(self): - log.error("WEEE1") - super(Graph, self).__init__() + self._conn = sqlite3.connect(":memory:") + self._conn.execute(''' + CREATE TABLE IF NOT EXISTS _gristsys_Deps ( + out_table_id TEXT, + out_col_id TEXT, + in_table_id TEXT, + in_col_id TEXT + ) + ''') + + # Maps ROWID of edge to a relation object. + self._edges = {} + + # # The set of all Edges, i.e. the complete dependency graph. + # self._all_edges = set() + + # # Map from node to the set of edges having it as the in_node (i.e. edges to dependents). + # self._in_node_map = {} + + # # Map from node to the set of edges having it as the out_node (i.e. edges to dependencies). + # self._out_node_map = {} + + def dump_graph(self): + """ + Print out the graph to stdout, for debugging. + """ + cursor = self._conn.cursor() + for edge in cursor.execute('SELECT * FROM _gristsys_Deps').fetchiter(): + print("edge", edge) + + def add_edge(self, out_node, in_node, relation): + """ + Adds an edge to the global dependency graph: out_node depends on in_node, i.e. a change to + in_node should trigger a recomputation of out_node. + """ + cursor = self._conn.cursor() + cursor.execute(''' + INSERT INTO _gristsys_Deps ( + out_table_id, + out_col_id, + in_table_id, + in_col_id + ) VALUES (?, ?, ?, ?) + ''', + out_node + in_node + ) + self._edges[cursor.lastrowid] = relation + + def clear_dependencies(self, out_node): + """ + Removes all edges which affect the given out_node, i.e. all of its dependencies. + """ + cursor = self._conn.cursor() + rows = cursor.execute(''' + SELECT rowid FROM _gristsys_Deps + WHERE out_table_id=? AND out_col_id=? + ''', + out_node + ) + for row in rows: + relation = self._edges.pop(row[0]) + if relation: + relation.reset_all() + cursor.execute(''' + DELETE FROM _gristsys_Deps + WHERE out_table_id=? AND out_col_id=? + ''', + out_node + ) + + def reset_dependencies(self, node, dirty_rows): + """ + For edges the given node depends on, reset the given output rows. This is called just before + the rows get recomputed, to allow the relations to clear out state for those rows if needed. + """ + cursor = self._conn.cursor() + rows = cursor.execute(''' + SELECT rowid FROM _gristsys_Deps + WHERE out_table_id=? AND out_col_id=? + ''', + node + ) + for row in rows: + relation = self._edges.get(row[0]) + if relation: + relation.reset_rows(dirty_rows) + + def remove_node_if_unused(self, node): + """ + Removes the given node if it has no dependents. Returns True if the node is gone, False if the + node has dependents. + """ + cursor = self._conn.cursor() + row = cursor.execute(''' + SELECT COUNT(*) FROM _gristsys_Deps + WHERE in_table_id=? AND in_col_id=? + ''', + node + ).fetchone() + if row[0] > 0: + return False + self.clear_dependencies(node) + return True + + + def invalidate_deps(self, dirty_node, dirty_rows, recompute_map, include_self=True): + """ + Invalidates the given rows in the given node, and all of its dependents, i.e. all the nodes + that recursively depend on dirty_node. If include_self is False, then skips the given node + (e.g. if the node is raw data rather than formula). Results are added to recompute_map, which + is a dict mapping Nodes to sets of rows that need to be recomputed. + + If dirty_rows is ALL_ROWS, the whole column is affected, and dependencies get recomputed from + scratch. ALL_ROWS propagates to all dependent columns, so those also get recomputed in full. + """ + to_invalidate = [(dirty_node, dirty_rows)] + + while to_invalidate: + dirty_node, dirty_rows = to_invalidate.pop() + if include_self: + if recompute_map.get(dirty_node) == ALL_ROWS: + continue + if dirty_rows == ALL_ROWS: + recompute_map[dirty_node] = ALL_ROWS + # If all rows are being recomputed, clear the dependencies of the affected column. (We add + # dependencies in the course of recomputing, but we can only start from an empty set of + # dependencies if we are about to recompute all rows.) + self.clear_dependencies(dirty_node) + else: + out_rows = recompute_map.setdefault(dirty_node, SortedSet()) + prev_count = len(out_rows) + out_rows.update(dirty_rows) + # Don't bother recursing into dependencies if we didn't actually update anything. + if len(out_rows) <= prev_count: + continue + + include_self = True + + + cursor = self._conn.cursor() + rows = cursor.execute(''' + SELECT rowid, out_table_id, out_col_id FROM _gristsys_Deps + WHERE in_table_id=? AND in_col_id=? + ''', + dirty_node + ) + for row in rows: + rowid, out_table_id, out_col_id = row + out_node = Node(out_table_id, out_col_id) + relation = self._edges.get(rowid) + if not relation: + continue + + affected_rows = relation.get_affected_rows(dirty_rows) + + # Previously this was: + # self.invalidate_deps(edge.out_node, affected_rows, recompute_map, include_self=True) + # but that led to a recursion error, so now we do the equivalent + # without actual recursion, hence the while loop + to_invalidate.append((out_node, affected_rows)) diff --git a/package.json b/package.json index 5259aac..ffa07f3 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,8 @@ "build": "cd core && yarn run build:prod", "build-docker": "docker buildx build --load -t gristlabs/megagrist --build-context=ext=ext core", "start": "cd core && sandbox/watch.sh", - "test": "cd core && mocha '_build/ext/test/**/*.js' ${DEBUG:+-b --no-exit} -g \"${GREP_TESTS}\" -n expose-gc" + "test": "cd core && mocha '_build/ext/test/**/*.js' ${DEBUG:+-b --no-exit} -g \"${GREP_TESTS}\" -n expose-gc", + "test:python": "cd core && PYTHONPATH=../ext/sandbox/grist yarn run test:python" }, "keywords": [], "private": true,