diff --git a/docs/data-loaders.md b/docs/data-loaders.md
index 0c3d0c08c..4b41979b6 100644
--- a/docs/data-loaders.md
+++ b/docs/data-loaders.md
@@ -322,3 +322,27 @@ RuntimeError: Unable to load file: quakes.csv
```
When any data loader fails, the entire build fails.
+
+## File Server
+
+Data loaders can request a file from Framework, by querying the `FILE_SERVER` HTTP endpoint indicated in their environment variables. For example, a bash data loader `mags.txt.sh` can read `quakes.json` and use [`jq`](https://jqlang.github.io/jq/) to extract the magnitude of recent earthquakes by calling:
+
+```sh
+curl ${FILE_SERVER}quakes.json | jq .features[].properties.mag
+```
+
+similarly, the following JavaScript data loader `quakecount.txt.js` will return the number of recent earthquakes:
+
+```js run=false
+const {FILE_SERVER} = process.env;
+const quakes = await fetch(`${FILE_SERVER}quakes.json`).then((reponse) => response.json());
+process.stdout.write(quakes.features.length);
+```
+
+In the preview server, when `quakes.json` is updated, `mags.txt` and `quakecount.txt` get automatically refreshed. If `quakes.json` is in fact generated by a data loader `quakes.json.sh`, modifying that script live-updates both files. (Even though `quakes.json` is called by two loaders, the file server ensures that it runs only once, ensuring consistency and optimal performance.) The interpreters used are inconsequent: this mechanism allows a python data loader to talk to typescript, and vice-versa.
+
+
+
+The `FILE_SERVER` endpoint is typically used to chain data loaders, with a loader that downloads a large dataset from the Web or from a database. That payload is then cached, and the data loaders that queried can run various kinds of analysis. Make sure you don’t have circular dependencies, as they will lead the preview server and the build process to hang! Like [archives](#archives), files queried through this endpoint are added to the build only if [statically referenced](./files#static-analysis) by `FileAttachment`.
+
+
diff --git a/src/fileWatchers.ts b/src/fileWatchers.ts
index c1dcdf15e..4f04d4415 100644
--- a/src/fileWatchers.ts
+++ b/src/fileWatchers.ts
@@ -2,6 +2,7 @@ import type {FSWatcher} from "node:fs";
import {watch} from "node:fs";
import {isEnoent} from "./error.js";
import {maybeStat} from "./files.js";
+import {chainDependencies} from "./loader.js";
import type {LoaderResolver} from "./loader.js";
import {resolvePath} from "./path.js";
@@ -11,38 +12,41 @@ export class FileWatchers {
static async of(loaders: LoaderResolver, path: string, names: Iterable, callback: (name: string) => void) {
const that = new FileWatchers();
const {watchers} = that;
+ const {root} = loaders;
for (const name of names) {
- const watchPath = loaders.getWatchPath(resolvePath(path, name));
- if (!watchPath) continue;
- let currentStat = await maybeStat(watchPath);
- let watcher: FSWatcher;
- const index = watchers.length;
- try {
- watcher = watch(watchPath, async function watched(type) {
- // Re-initialize the watcher on the original path on rename.
- if (type === "rename") {
- watcher.close();
- try {
- watcher = watchers[index] = watch(watchPath, watched);
- } catch (error) {
- if (!isEnoent(error)) throw error;
- console.error(`file no longer exists: ${watchPath}`);
+ for (const p of chainDependencies(root, resolvePath(path, name))) {
+ const watchPath = loaders.getWatchPath(p);
+ if (!watchPath) continue;
+ let currentStat = await maybeStat(watchPath);
+ let watcher: FSWatcher;
+ const index = watchers.length;
+ try {
+ watcher = watch(watchPath, async function watched(type) {
+ // Re-initialize the watcher on the original path on rename.
+ if (type === "rename") {
+ watcher.close();
+ try {
+ watcher = watchers[index] = watch(watchPath, watched);
+ } catch (error) {
+ if (!isEnoent(error)) throw error;
+ console.error(`file no longer exists: ${watchPath}`);
+ return;
+ }
+ setTimeout(() => watched("change"), 100); // delay to avoid a possibly-empty file
return;
}
- setTimeout(() => watched("change"), 100); // delay to avoid a possibly-empty file
- return;
- }
- const newStat = await maybeStat(watchPath);
- // Ignore if the file was truncated or not modified.
- if (currentStat?.mtimeMs === newStat?.mtimeMs || newStat?.size === 0) return;
- currentStat = newStat;
- callback(name);
- });
- } catch (error) {
- if (!isEnoent(error)) throw error;
- continue;
+ const newStat = await maybeStat(watchPath);
+ // Ignore if the file was truncated or not modified.
+ if (currentStat?.mtimeMs === newStat?.mtimeMs || newStat?.size === 0) return;
+ currentStat = newStat;
+ callback(name);
+ });
+ } catch (error) {
+ if (!isEnoent(error)) throw error;
+ continue;
+ }
+ watchers[index] = watcher;
}
- watchers[index] = watcher;
}
return that;
}
diff --git a/src/loader.ts b/src/loader.ts
index 324f1a8e1..69585be18 100644
--- a/src/loader.ts
+++ b/src/loader.ts
@@ -1,13 +1,13 @@
import {createHash} from "node:crypto";
import type {FSWatcher, WatchListener, WriteStream} from "node:fs";
-import {createReadStream, existsSync, statSync, watch} from "node:fs";
-import {open, readFile, rename, unlink} from "node:fs/promises";
+import {createReadStream, existsSync, readFileSync, statSync, watch} from "node:fs";
+import {open, readFile, rename, rm, unlink, writeFile} from "node:fs/promises";
import {dirname, extname, join} from "node:path/posix";
import {createGunzip} from "node:zlib";
import {spawn} from "cross-spawn";
import JSZip from "jszip";
import {extract} from "tar-stream";
-import {enoent} from "./error.js";
+import {enoent, isEnoent} from "./error.js";
import {maybeStat, prepareOutput, visitFiles} from "./files.js";
import {FileWatchers} from "./fileWatchers.js";
import {formatByteSize} from "./format.js";
@@ -16,6 +16,7 @@ import {findModule, getFileInfo, getLocalModuleHash, getModuleHash} from "./java
import type {Logger, Writer} from "./logger.js";
import type {MarkdownPage, ParseOptions} from "./markdown.js";
import {parseMarkdown} from "./markdown.js";
+import {preview} from "./preview.js";
import {getModuleResolver, resolveImportPath} from "./resolvers.js";
import type {Params} from "./route.js";
import {isParameterized, requote, route} from "./route.js";
@@ -51,6 +52,9 @@ const defaultEffects: LoadEffects = {
export interface LoadOptions {
/** Whether to use a stale cache; true when building. */
useStale?: boolean;
+
+ /** An asset server for chained data loaders. */
+ FILE_SERVER?: string;
}
export interface LoaderOptions {
@@ -61,7 +65,7 @@ export interface LoaderOptions {
}
export class LoaderResolver {
- private readonly root: string;
+ readonly root: string;
private readonly interpreters: Map;
constructor({root, interpreters}: {root: string; interpreters?: Record}) {
@@ -294,7 +298,13 @@ export class LoaderResolver {
const info = getFileInfo(this.root, path);
if (!info) return createHash("sha256").digest("hex");
const {hash} = info;
- return path === name ? hash : createHash("sha256").update(hash).update(String(info.mtimeMs)).digest("hex");
+ if (path === name) return hash;
+ const hash2 = createHash("sha256");
+ for (const p of chainDependencies(this.root, name)) {
+ const info = getFileInfo(this.root, this.getSourceFilePath(p));
+ if (info) hash2.update(info.hash).update(String(info.mtimeMs));
+ }
+ return hash2.digest("hex");
}
getOutputFileHash(name: string): string {
@@ -404,15 +414,33 @@ abstract class AbstractLoader implements Loader {
let command = runningCommands.get(key);
if (!command) {
command = (async () => {
+ const loaderStat = await maybeStat(loaderPath);
+ const paths = chainDependencies(this.root, this.targetPath);
+ const FRESH = 0;
+ const STALE = 1;
+ const MISSING = 2;
+ let status = FRESH;
+ for (const path of paths) {
+ const cachePath = join(this.root, ".observablehq", "cache", path);
+ const cacheStat = await maybeStat(cachePath);
+ if (!cacheStat) {
+ status = MISSING;
+ break;
+ } else if (cacheStat.mtimeMs < loaderStat!.mtimeMs) status = Math.max(status, STALE);
+ }
const outputPath = join(".observablehq", "cache", this.targetPath);
const cachePath = join(this.root, outputPath);
- const loaderStat = await maybeStat(loaderPath);
- const cacheStat = await maybeStat(cachePath);
- if (!cacheStat) effects.output.write(faint("[missing] "));
- else if (cacheStat.mtimeMs < loaderStat!.mtimeMs) {
- if (useStale) return effects.output.write(faint("[using stale] ")), outputPath;
- else effects.output.write(faint("[stale] "));
- } else return effects.output.write(faint("[fresh] ")), outputPath;
+ switch (status) {
+ case FRESH:
+ return effects.output.write(faint("[fresh] ")), outputPath;
+ case STALE:
+ if (useStale) return effects.output.write(faint("[using stale] ")), outputPath;
+ effects.output.write(faint("[stale] "));
+ break;
+ case MISSING:
+ effects.output.write(faint("[missing] "));
+ break;
+ }
const tempPath = join(this.root, ".observablehq", "cache", `${this.targetPath}.${process.pid}`);
const errorPath = tempPath + ".err";
const errorStat = await maybeStat(errorPath);
@@ -424,8 +452,17 @@ abstract class AbstractLoader implements Loader {
await prepareOutput(tempPath);
await prepareOutput(cachePath);
const tempFd = await open(tempPath, "w");
+
+ // Launch a server for chained data loaders. TODO configure host?
+ const dependencies = new Set();
+ const {server} = await preview({root: this.root, verbose: false, hostname: "127.0.0.1", dependencies});
+ const address = server.address();
+ if (!address || typeof address !== "object")
+ throw new Error("Couldn't launch server for chained data loaders!");
+ const FILE_SERVER = `http://${address.address}:${address.port}/_file/`;
+
try {
- await this.exec(tempFd.createWriteStream({highWaterMark: 1024 * 1024}), {useStale}, effects);
+ await this.exec(tempFd.createWriteStream({highWaterMark: 1024 * 1024}), {useStale, FILE_SERVER}, effects);
await rename(tempPath, cachePath);
} catch (error) {
await rename(tempPath, errorPath);
@@ -433,6 +470,19 @@ abstract class AbstractLoader implements Loader {
} finally {
await tempFd.close();
}
+
+ const cachedeps = `${cachePath}__dependencies`;
+ if (dependencies.size) await writeFile(cachedeps, JSON.stringify([...dependencies]), "utf-8");
+ else
+ try {
+ await rm(cachedeps);
+ } catch (error) {
+ if (!isEnoent(error)) throw error;
+ }
+
+ // TODO: server.close() might be enough?
+ await new Promise((closed) => server.close(closed));
+
return outputPath;
})();
command.finally(() => runningCommands.delete(key)).catch(() => {});
@@ -485,8 +535,12 @@ class CommandLoader extends AbstractLoader {
this.args = args;
}
- async exec(output: WriteStream): Promise {
- const subprocess = spawn(this.command, this.args, {windowsHide: true, stdio: ["ignore", output, "inherit"]});
+ async exec(output: WriteStream, {FILE_SERVER}): Promise {
+ const subprocess = spawn(this.command, this.args, {
+ windowsHide: true,
+ stdio: ["ignore", output, "inherit"],
+ env: {...process.env, FILE_SERVER}
+ });
const code = await new Promise((resolve, reject) => {
subprocess.on("error", reject);
subprocess.on("close", resolve);
@@ -572,3 +626,18 @@ function formatElapsed(start: number): string {
const elapsed = performance.now() - start;
return `${Math.floor(elapsed)}ms`;
}
+
+export function chainDependencies(root: string, path: string): Set {
+ const paths = new Set([path]);
+ for (const path of paths) {
+ try {
+ for (const f of JSON.parse(readFileSync(join(root, ".observablehq", "cache", `${path}__dependencies`), "utf-8")))
+ paths.add(f);
+ } catch (error) {
+ if (!isEnoent(error)) {
+ throw error;
+ }
+ }
+ }
+ return paths;
+}
diff --git a/src/preview.ts b/src/preview.ts
index 130340dcb..4cd9a8347 100644
--- a/src/preview.ts
+++ b/src/preview.ts
@@ -46,6 +46,7 @@ export interface PreviewOptions {
port?: number;
origins?: string[];
verbose?: boolean;
+ dependencies?: Set;
}
export async function preview(options: PreviewOptions): Promise {
@@ -59,19 +60,22 @@ export class PreviewServer {
private readonly _server: ReturnType;
private readonly _socketServer: WebSocketServer;
private readonly _verbose: boolean;
+ private readonly dependencies: Set | undefined;
private constructor({
config,
root,
origins = [],
server,
- verbose
+ verbose,
+ dependencies
}: {
config?: string;
root?: string;
origins?: string[];
server: Server;
verbose: boolean;
+ dependencies?: Set;
}) {
this._config = config;
this._root = root;
@@ -81,6 +85,7 @@ export class PreviewServer {
this._server.on("request", this._handleRequest);
this._socketServer = new WebSocketServer({server: this._server});
this._socketServer.on("connection", this._handleConnection);
+ this.dependencies = dependencies;
}
static async start({verbose = true, hostname, port, open, ...options}: PreviewOptions) {
@@ -178,6 +183,7 @@ export class PreviewServer {
throw enoent(path);
} else if (pathname.startsWith("/_file/")) {
const path = pathname.slice("/_file".length);
+ if (this.dependencies) this.dependencies.add(path);
const loader = loaders.find(path);
if (!loader) throw enoent(path);
send(req, await loader.load(), {root}).pipe(res);
diff --git a/test/build-test.ts b/test/build-test.ts
index da7726f6c..b224a22d7 100644
--- a/test/build-test.ts
+++ b/test/build-test.ts
@@ -216,6 +216,7 @@ function* findFiles(root: string): Iterable {
visited.add(status.ino);
for (const entry of readdirSync(path)) {
if (entry === ".DS_Store") continue; // macOS
+ if (entry === ".ignoreme") continue; // see inputs/build/chain/
queue.push(join(path, entry));
}
} else {
diff --git a/test/input/build/chain/chain-source.json.ts b/test/input/build/chain/chain-source.json.ts
new file mode 100644
index 000000000..65e4dc5fe
--- /dev/null
+++ b/test/input/build/chain/chain-source.json.ts
@@ -0,0 +1,10 @@
+import {existsSync, writeFileSync} from "node:fs";
+const testFile = "./test/output/build/chain-changed/.ignoreme";
+
+const x = existsSync(testFile) ? 0 : 3;
+
+try {
+ writeFileSync(testFile, "—");
+} catch (error) { }
+
+process.stdout.write(JSON.stringify({ x }));
diff --git a/test/input/build/chain/chain.json.ts b/test/input/build/chain/chain.json.ts
new file mode 100644
index 000000000..911822172
--- /dev/null
+++ b/test/input/build/chain/chain.json.ts
@@ -0,0 +1 @@
+console.log(JSON.stringify(process.env.address, null, 2));
\ No newline at end of file
diff --git a/test/input/build/chain/chain.md b/test/input/build/chain/chain.md
new file mode 100644
index 000000000..2356a792e
--- /dev/null
+++ b/test/input/build/chain/chain.md
@@ -0,0 +1,9 @@
+# Chained data loaders
+
+```js
+FileAttachment("chain1.json").json()
+```
+
+```js
+FileAttachment("chain2.csv").csv({typed: true})
+```
diff --git a/test/input/build/chain/chain1.json.ts b/test/input/build/chain/chain1.json.ts
new file mode 100644
index 000000000..cb069d658
--- /dev/null
+++ b/test/input/build/chain/chain1.json.ts
@@ -0,0 +1,3 @@
+const {FILE_SERVER} = process.env;
+const {x} = await fetch(`${FILE_SERVER}chain-source.json`).then((response) => response.json());
+process.stdout.write(JSON.stringify({x, "x^2": x * x}, null, 2));
diff --git a/test/input/build/chain/chain2.csv.ts b/test/input/build/chain/chain2.csv.ts
new file mode 100644
index 000000000..f85d7098a
--- /dev/null
+++ b/test/input/build/chain/chain2.csv.ts
@@ -0,0 +1,3 @@
+const {FILE_SERVER} = process.env;
+const {x} = await fetch(`${FILE_SERVER}chain-source.json`).then((response) => response.json());
+process.stdout.write(`name,value\nx,${x}\nx^2,${x * x}`);
diff --git a/test/output/build/chain/_file/chain1.e1f60496.json b/test/output/build/chain/_file/chain1.e1f60496.json
new file mode 100644
index 000000000..ec829e982
--- /dev/null
+++ b/test/output/build/chain/_file/chain1.e1f60496.json
@@ -0,0 +1,4 @@
+{
+ "x": 3,
+ "x^2": 9
+}
\ No newline at end of file
diff --git a/test/output/build/chain/_file/chain2.18991dde.csv b/test/output/build/chain/_file/chain2.18991dde.csv
new file mode 100644
index 000000000..32c0040a8
--- /dev/null
+++ b/test/output/build/chain/_file/chain2.18991dde.csv
@@ -0,0 +1,3 @@
+name,value
+x,3
+x^2,9
\ No newline at end of file
diff --git a/test/output/build/chain/_npm/d3-dsv@3.0.1/cd372fb8.js b/test/output/build/chain/_npm/d3-dsv@3.0.1/cd372fb8.js
new file mode 100644
index 000000000..e69de29bb
diff --git a/test/output/build/chain/_observablehq/client.00000001.js b/test/output/build/chain/_observablehq/client.00000001.js
new file mode 100644
index 000000000..e69de29bb
diff --git a/test/output/build/chain/_observablehq/runtime.00000002.js b/test/output/build/chain/_observablehq/runtime.00000002.js
new file mode 100644
index 000000000..e69de29bb
diff --git a/test/output/build/chain/_observablehq/stdlib.00000003.js b/test/output/build/chain/_observablehq/stdlib.00000003.js
new file mode 100644
index 000000000..e69de29bb
diff --git a/test/output/build/chain/_observablehq/theme-air,near-midnight.00000004.css b/test/output/build/chain/_observablehq/theme-air,near-midnight.00000004.css
new file mode 100644
index 000000000..e69de29bb
diff --git a/test/output/build/chain/chain.html b/test/output/build/chain/chain.html
new file mode 100644
index 000000000..eb3d0333c
--- /dev/null
+++ b/test/output/build/chain/chain.html
@@ -0,0 +1,61 @@
+
+
+
+
+Chained data loaders
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+