Skip to content

Commit 8f1c3b4

Browse files
authored
Merge pull request #8953 from romayalon/romy-add-total-lines-bytes-offset-to-file-reader
File Reader | Add next_line_file_offset
2 parents 1ccdd48 + 2116887 commit 8f1c3b4

File tree

2 files changed

+94
-7
lines changed

2 files changed

+94
-7
lines changed

src/test/unit_tests/jest_tests/test_newline_reader.test.js

+83-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,26 @@ describe('newline_reader', () => {
5151
expect(result).toStrictEqual(UTF8DATA_ARR);
5252
});
5353

54+
it('read_file_offset - can process utf8 characters when termination with newline character', async () => {
55+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n') + '\n', 'utf8');
56+
57+
const reader = new NewlineReader({}, '', { skip_leftover_line: true, read_file_offset: 0 });
58+
// @ts-ignore
59+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
60+
61+
const result = [];
62+
let expected_cur_next_line_file_offset = 0;
63+
const [processed] = await reader.forEach(async entry => {
64+
result.push(entry);
65+
expected_cur_next_line_file_offset += Buffer.byteLength(entry, 'utf8') + 1;
66+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
67+
return true;
68+
});
69+
70+
expect(processed).toBe(UTF8DATA_ARR.length);
71+
expect(result).toStrictEqual(UTF8DATA_ARR);
72+
});
73+
5474
it('can process utf8 characters when termination not with new line character', async () => {
5575
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n'), 'utf8');
5676

@@ -68,7 +88,48 @@ describe('newline_reader', () => {
6888
expect(result).toStrictEqual(UTF8DATA_ARR);
6989
});
7090

71-
it('can process utf8 characters when termination not with new line character [bufsize = 4]', async () => {
91+
it('read_file_offset - can process utf8 characters when termination not with new line character', async () => {
92+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n'), 'utf8');
93+
94+
const reader = new NewlineReader({}, '', { read_file_offset: 0 });
95+
// @ts-ignore
96+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
97+
98+
const result = [];
99+
let expected_cur_next_line_file_offset = 0;
100+
const [processed] = await reader.forEach(async entry => {
101+
result.push(entry);
102+
expected_cur_next_line_file_offset += Buffer.byteLength(entry, 'utf8') + (reader.eof ? 0 : 1);
103+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
104+
return true;
105+
});
106+
107+
expect(processed).toBe(UTF8DATA_ARR.length);
108+
expect(result).toStrictEqual(UTF8DATA_ARR);
109+
});
110+
111+
it('read_file_offset starts from the second line - can process utf8 characters when termination not with new line character', async () => {
112+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n'), 'utf8');
113+
const expected_to_be_processed_data_array = UTF8DATA_ARR.slice(1);
114+
const initial_next_line_file_offset = Buffer.byteLength(UTF8DATA_ARR[0], 'utf8') + 1;
115+
const reader = new NewlineReader({}, '', { read_file_offset: initial_next_line_file_offset});
116+
// @ts-ignore
117+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
118+
119+
const result = [];
120+
let expected_cur_next_line_file_offset = initial_next_line_file_offset;
121+
const [processed] = await reader.forEach(async entry => {
122+
result.push(entry);
123+
expected_cur_next_line_file_offset += Buffer.byteLength(entry, 'utf8') + (reader.eof ? 0 : 1);
124+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
125+
return true;
126+
});
127+
128+
expect(processed).toBe(expected_to_be_processed_data_array.length);
129+
expect(result).toStrictEqual(expected_to_be_processed_data_array);
130+
});
131+
132+
it('can process utf8 characters when termination not with new line character [bufsize = 256]', async () => {
72133
const expected = "abc";
73134
const UTF8DATA_ARR_TEMP = [ ...UTF8DATA_ARR, expected ];
74135
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR_TEMP.join('\n'), 'utf8');
@@ -86,5 +147,26 @@ describe('newline_reader', () => {
86147
expect(processed).toBe(1);
87148
expect(result).toStrictEqual([expected]);
88149
});
150+
151+
it('read_file_offset - can process utf8 characters when termination not with new line character [bufsize = 256]', async () => {
152+
const expected = "abc";
153+
const UTF8DATA_ARR_TEMP = [ ...UTF8DATA_ARR, expected ];
154+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR_TEMP.join('\n'), 'utf8');
155+
156+
const reader = new NewlineReader({}, '', { bufsize: 256, skip_overflow_lines: true, read_file_offset: 0 });
157+
// @ts-ignore
158+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
159+
160+
const result = [];
161+
const [processed] = await reader.forEach(async entry => {
162+
result.push(entry);
163+
return true;
164+
});
165+
166+
expect(processed).toBe(1);
167+
expect(result).toStrictEqual([expected]);
168+
const expected_cur_next_line_file_offset = UTF8DATA_BUF.length;
169+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
170+
});
89171
});
90172
});

src/util/file_reader.js

+11-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class NewlineReader {
3030
* bufsize?: number;
3131
* skip_leftover_line?: boolean;
3232
* skip_overflow_lines?: boolean;
33+
* read_file_offset?: number;
3334
* }} [cfg]
3435
**/
3536
constructor(fs_context, filepath, cfg) {
@@ -41,18 +42,19 @@ class NewlineReader {
4142
this.fs_context = fs_context;
4243
this.fh = null;
4344
this.eof = false;
44-
this.readoffset = 0;
45+
this.read_file_offset = cfg?.read_file_offset || 0;
4546

4647
this.buf = Buffer.alloc(cfg?.bufsize || 64 * 1024);
4748
this.start = 0;
4849
this.end = 0;
4950
this.overflow_state = false;
51+
this.next_line_file_offset = cfg?.read_file_offset || 0;
5052
}
5153

5254
info() {
5355
return {
5456
path: this.path,
55-
read_offset: this.readoffset,
57+
read_offset: this.read_file_offset,
5658
overflow_state: this.overflow_state,
5759
start: this.start,
5860
end: this.end,
@@ -67,6 +69,7 @@ class NewlineReader {
6769
async nextline() {
6870
if (!this.fh) await this.init();
6971

72+
// TODO - in case more data will be appended to the file - after each read the reader must set reader.eof = false if someone will keep on reading from a file while it is being written.
7073
while (!this.eof) {
7174
// extract next line if terminated in current buffer
7275
if (this.start < this.end) {
@@ -78,9 +81,9 @@ class NewlineReader {
7881
this.start += term_idx + 1;
7982
continue;
8083
}
81-
8284
const line = this.buf.toString('utf8', this.start, this.start + term_idx);
8385
this.start += term_idx + 1;
86+
this.next_line_file_offset = this.read_file_offset - (this.end - this.start);
8487
return line;
8588
}
8689
}
@@ -106,7 +109,7 @@ class NewlineReader {
106109

107110
// read from file
108111
const avail = this.buf.length - this.end;
109-
const read = await this.fh.read(this.fs_context, this.buf, this.end, avail, this.readoffset);
112+
const read = await this.fh.read(this.fs_context, this.buf, this.end, avail, this.read_file_offset);
110113
if (!read) {
111114
this.eof = true;
112115

@@ -118,13 +121,15 @@ class NewlineReader {
118121
console.warn('line too long finally terminated at eof:', this.info());
119122
} else {
120123
const line = this.buf.toString('utf8', this.start, this.end);
124+
this.start = this.end;
125+
this.next_line_file_offset = this.read_file_offset;
121126
return line;
122127
}
123128
}
124129

125130
return null;
126131
}
127-
this.readoffset += read;
132+
this.read_file_offset += read;
128133
this.end += read;
129134
}
130135

@@ -169,7 +174,7 @@ class NewlineReader {
169174
// was moved, this will still keep on reading from the previous FD.
170175
reset() {
171176
this.eof = false;
172-
this.readoffset = 0;
177+
this.read_file_offset = 0;
173178
this.start = 0;
174179
this.end = 0;
175180
this.overflow_state = false;

0 commit comments

Comments
 (0)