Skip to content

Commit c0c14fd

Browse files
committed
fetch CSV files as stream
1 parent bf09efe commit c0c14fd

File tree

6 files changed

+242
-14
lines changed

6 files changed

+242
-14
lines changed

package.json

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@
3030
"@magda/ci-utils": "^1.0.5",
3131
"@magda/docker-utils": "^6.0.0-alpha.4",
3232
"@types/chai": "^5.2.2",
33+
"@types/fs-extra": "^11.0.4",
3334
"@types/mocha": "^10.0.10",
35+
"@types/nock": "^11.1.0",
3436
"@types/node": "^20.19.0",
3537
"@types/yargs": "^17.0.33",
38+
"@types/sinon": "^17.0.4",
3639
"chai": "^5.2.0",
3740
"husky": "^9.1.7",
3841
"mocha": "^11.7.1",
42+
"nock": "^14.0.7",
3943
"prettier": "^3.5.3",
4044
"pretty-quick": "^4.2.2",
4145
"rimraf": "^3.0.0",
@@ -47,9 +51,10 @@
4751
"dependencies": {
4852
"@langchain/core": "^0.3.61",
4953
"@langchain/textsplitters": "https://gitpkg.vercel.app/JHZhang2736/langchainjs/libs/langchain-textsplitters?textsplitters-build",
50-
"@magda/semantic-indexer-sdk": "^6.0.0-alpha.4",
51-
"@types/sinon": "^17.0.4",
52-
"urijs": "^1.19.11",
54+
"@magda/esm-utils": "^1.0.1",
55+
"@magda/semantic-indexer-sdk": "6.0.0-alpha.6",
56+
"csv-parse": "^6.1.0",
57+
"fs-extra": "^11.3.0",
5358
"yargs": "17.6.2"
5459
},
5560
"resolutions": {

src/createEmbeddingText.ts

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ import {
66
import { openSync, readSync, closeSync, existsSync } from "fs";
77
import { basename } from "path";
88
import { toYaml } from "./utils/toYaml.js";
9+
import { parse } from "csv-parse";
10+
import fse from "fs-extra";
11+
import path from "path";
12+
import { Readable } from "stream";
13+
import type { ReadableStream as WebReadableStream } from "stream/web";
14+
import { __dirname as getCurDirPath } from "@magda/esm-utils";
15+
16+
const __dirname = getCurDirPath();
917

1018
export function readFirstBytes(path: string, bytes: number): string {
1119
const fd = openSync(path, "r");
@@ -58,6 +66,100 @@ export function formatTemporal(tc?: { start?: string; end?: string }): string {
5866
return "";
5967
}
6068

69+
const CONNECTION_TIMEOUT = 120;
70+
71+
function toNodeReadable(body: any): NodeJS.ReadableStream {
72+
return typeof body?.pipe === "function"
73+
? body
74+
: Readable.fromWeb(body as WebReadableStream<Uint8Array>);
75+
}
76+
77+
let cachedPackageInfo: { name: string; version: string } | null = null;
78+
79+
async function getPackageInfo() {
80+
if (!cachedPackageInfo) {
81+
cachedPackageInfo = await fse.readJSON(path.resolve(__dirname, "../package.json"), {
82+
encoding: "utf-8"
83+
});
84+
}
85+
return cachedPackageInfo;
86+
}
87+
88+
export async function getColumnsFromCSVStream(downloadURL: string, timeout: number = CONNECTION_TIMEOUT * 1000): Promise<string[]> {
89+
const controller = new AbortController();
90+
const timeoutId = setTimeout(() => controller.abort(), timeout);
91+
const pkg = await getPackageInfo();
92+
93+
try {
94+
const res = await fetch(downloadURL, {
95+
redirect: "follow",
96+
headers: {
97+
"User-Agent": `${pkg?.name}/${pkg?.version}`
98+
},
99+
signal: controller.signal
100+
});
101+
102+
if (!res.ok || !res.body) {
103+
throw new Error(`HTTP ${res.status} fetching ${downloadURL}`);
104+
}
105+
106+
const parser = parse({
107+
bom: true,
108+
trim: true,
109+
skip_empty_lines: true,
110+
relax_column_count: true,
111+
delimiter: [",", ";", "\t", "|"],
112+
from_line: 1,
113+
to_line: 1,
114+
columns: false
115+
});
116+
117+
const csvStream: NodeJS.ReadableStream = toNodeReadable(res.body);
118+
119+
const cols = await new Promise<string[]>((resolve, reject) => {
120+
let resolved = false;
121+
122+
const handleResolve = (columns: string[]) => {
123+
if (!resolved) {
124+
resolved = true;
125+
resolve(columns);
126+
}
127+
};
128+
129+
const handleReject = (error: Error) => {
130+
if (!resolved) {
131+
resolved = true;
132+
reject(new Error(`CSV parsing error: ${error.message}`));
133+
}
134+
};
135+
136+
csvStream
137+
.pipe(parser)
138+
.on("data", (row: string[]) => {
139+
const columns = row.map(col => col.trim()).filter(Boolean);
140+
handleResolve(columns);
141+
})
142+
.on("error", (error) => {
143+
if (error.name === 'AbortError') {
144+
return;
145+
}
146+
handleReject(error);
147+
})
148+
.on("end", () => {
149+
handleResolve([]);
150+
});
151+
});
152+
return cols;
153+
} catch (error) {
154+
if (error instanceof Error && error.name === 'AbortError') {
155+
throw new Error('Timeout when fetching CSV');
156+
}
157+
throw new Error(`Failed to fetch CSV: ${error instanceof Error ? error.message : String(error)}`);
158+
} finally {
159+
clearTimeout(timeoutId);
160+
}
161+
}
162+
61163
export const createEmbeddingText: CreateEmbeddingText = async ({
62164
record,
63165
format,
@@ -104,6 +206,15 @@ export const createEmbeddingText: CreateEmbeddingText = async ({
104206
if (filePath && existsSync(filePath)) {
105207
const header = readFirstBytes(filePath, 64 * 1024).split(/\r?\n/)[0] ?? "";
106208
columns = header.split(",").map(c => c.trim()).filter(Boolean);
209+
} else if (url) {
210+
try {
211+
columns = await getColumnsFromCSVStream(url);
212+
} catch (e) {
213+
if (e instanceof Error) {
214+
console.warn("Failed to get columns from CSV file", e.message);
215+
}
216+
columns = [];
217+
}
107218
}
108219

109220
const yamlText = toYaml({
@@ -117,9 +228,8 @@ export const createEmbeddingText: CreateEmbeddingText = async ({
117228
Keywords: keywords || undefined,
118229
Languages: languages || undefined,
119230
Description: description || undefined,
120-
Columns: columns || undefined
231+
"Column names": columns || undefined
121232
});
122233

123-
124234
return { text: yamlText };
125235
};

src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import semanticIndexer, {
55
} from "@magda/semantic-indexer-sdk";
66
import { csvSemanticIndexerArgs } from "./csvSemanticIndexerArgs.js";
77
import { createEmbeddingText } from "./createEmbeddingText.js";
8-
import { Chunker } from "./chunker.js";
8+
import { Chunker } from "./Chunker.js";
99

1010
const port = csvSemanticIndexerArgs.port;
1111
const args = commonYargs(port, `http://localhost:${port}`);
@@ -21,7 +21,7 @@ const options: SemanticIndexerOptions = {
2121
id: csvSemanticIndexerArgs.id,
2222
itemType: "storageObject",
2323
formatTypes: ["csv"],
24-
autoDownloadFile: true,
24+
autoDownloadFile: false,
2525
createEmbeddingText: createEmbeddingText,
2626
chunkStrategy: chunkStrategy,
2727
};

src/test/chunker.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from "chai";
2-
import { Chunker } from "../chunker.js";
2+
import { Chunker } from "../Chunker.js";
33

44
describe("Chunker", () => {
55
it("should chunk yaml text with proper overlapping", async () => {

src/test/createEmbeddingText.spec.ts

Lines changed: 117 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { parse as parseYaml } from "yaml";
33
import * as fs from "fs";
44
import { join, basename } from "path";
55
import { tmpdir } from "os";
6-
import { createEmbeddingText, formatTemporal } from "../createEmbeddingText.js";
6+
import { createEmbeddingText, formatTemporal, getColumnsFromCSVStream } from "../createEmbeddingText.js";
7+
import nock from "nock";
78

89
describe("createEmbeddingText", () => {
910
let tempFile: string;
@@ -68,7 +69,7 @@ describe("createEmbeddingText", () => {
6869
Themes: ["Theme1", "Theme2"],
6970
Keywords: ["Keyword1", "Keyword2"],
7071
Languages: ["en", "zh"],
71-
Columns: ["colA", "colB", "colC"]
72+
"Column names": ["colA", "colB", "colC"]
7273
};
7374

7475
expect(yamlObj).to.deep.equal(expected);
@@ -114,7 +115,7 @@ describe("createEmbeddingText", () => {
114115
const expected = {
115116
Title: "Dataset Title",
116117
Format: "CSV",
117-
Columns: ["colA", "colB", "colC"]
118+
"Column names": ["colA", "colB", "colC"]
118119
};
119120

120121
expect(yamlObj).to.deep.equal(expected);
@@ -161,7 +162,7 @@ describe("createEmbeddingText", () => {
161162
Format: "CSV",
162163
"File name": "data.csv",
163164
Description: "Dataset Desc",
164-
Columns: ["colA", "colB", "colC"]
165+
"Column names": ["colA", "colB", "colC"]
165166
};
166167

167168
expect(yamlObj).to.deep.equal(expected);
@@ -241,6 +242,118 @@ describe("createEmbeddingText", () => {
241242
const yamlObj = parseYaml(text);
242243

243244
expect(yamlObj.Description).to.equal("First line\nSecond line");
245+
});
246+
});
247+
248+
describe("getColumnsFromCSVStream", () => {
249+
beforeEach(() => {
250+
nock.cleanAll();
251+
});
252+
253+
afterEach(() => {
254+
nock.cleanAll();
255+
});
256+
257+
it("should extract column names from comma-separated CSV file", async () => {
258+
nock("http://example.com")
259+
.get("/test-csv")
260+
.reply(200, "column1,column2,column3\nvalue1,value2,value3", {
261+
"Content-Type": "text/csv"
262+
});
263+
264+
const columns = await getColumnsFromCSVStream("http://example.com/test-csv");
265+
expect(columns).to.deep.equal(["column1", "column2", "column3"]);
266+
});
267+
268+
it("should handle empty CSV file", async () => {
269+
nock("http://example.com")
270+
.get("/empty-csv")
271+
.reply(200, "", {
272+
"Content-Type": "text/csv"
273+
});
274+
275+
const columns = await getColumnsFromCSVStream("http://example.com/empty-csv");
276+
expect(columns).to.deep.equal([]);
277+
});
278+
279+
it("should throw exception on HTTP 404 error", async () => {
280+
nock("http://example.com")
281+
.get("/not-found")
282+
.reply(404, "Not Found");
283+
284+
try {
285+
await getColumnsFromCSVStream("http://example.com/not-found");
286+
expect.fail("should throw exception");
287+
} catch (error: any) {
288+
expect(error.message).to.include("HTTP 404");
289+
}
290+
});
291+
292+
it("should throw exception on HTTP 500 error", async () => {
293+
nock("http://example.com")
294+
.get("/server-error")
295+
.reply(500, "Internal Server Error");
296+
297+
try {
298+
await getColumnsFromCSVStream("http://example.com/server-error");
299+
expect.fail("should throw exception");
300+
} catch (error: any) {
301+
expect(error.message).to.include("HTTP 500");
302+
}
303+
});
304+
305+
it("should throw exception on network error", async () => {
306+
nock("http://invalid-url-that-does-not-exist.com")
307+
.get("/test.csv")
308+
.replyWithError("Network Error");
309+
310+
try {
311+
await getColumnsFromCSVStream("http://invalid-url-that-does-not-exist.com/test.csv");
312+
expect.fail("should throw exception");
313+
} catch (error: any) {
314+
expect(error).to.be.instanceOf(Error);
315+
}
316+
});
317+
318+
it("should handle CSV file with BOM", async () => {
319+
const bom = Buffer.from([0xEF, 0xBB, 0xBF]);
320+
const csvContent = "column1,column2,column3\nvalue1,value2,value3";
321+
const bomCsv = Buffer.concat([bom, Buffer.from(csvContent)]);
322+
323+
nock("http://example.com")
324+
.get("/bom-csv")
325+
.reply(200, bomCsv, {
326+
"Content-Type": "text/csv"
327+
});
328+
329+
const columns = await getColumnsFromCSVStream("http://example.com/bom-csv");
330+
expect(columns).to.deep.equal(["column1", "column2", "column3"]);
331+
});
332+
333+
it("should handle CSV file with empty columns", async () => {
334+
nock("http://example.com")
335+
.get("/messy-csv")
336+
.reply(200, "col1, ,col3,\n\nval1,val2,val3,val4", {
337+
"Content-Type": "text/csv"
338+
});
339+
340+
const columns = await getColumnsFromCSVStream("http://example.com/messy-csv");
341+
expect(columns).to.deep.equal(["col1", "col3"]);
342+
});
343+
344+
it("should timeout and abort correctly", async () => {
345+
nock("http://example.com")
346+
.get("/slow-csv")
347+
.delay(3000)
348+
.reply(200, "column1,column2,column3\nvalue1,value2,value3", {
349+
"Content-Type": "text/csv"
350+
});
244351

352+
try {
353+
await getColumnsFromCSVStream("http://example.com/slow-csv", 1000);
354+
expect.fail("should throw timeout exception");
355+
} catch (error: any) {
356+
expect(error).to.be.instanceOf(Error);
357+
}
245358
});
246359
});

tsconfig.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"compilerOptions": {
33
"target": "ES2020",
4-
"module": "esnext",
5-
"moduleResolution": "node",
4+
"module": "Node16",
5+
"moduleResolution": "node16",
66
"esModuleInterop": true,
77
"strict": true,
88
"outDir": "dist",

0 commit comments

Comments
 (0)