@@ -13,6 +13,7 @@ import { getNextPowerOfTwo } from '../../common/Util';
1313import ScanningRequest from '../../common/model/ScanningRequest' ;
1414import winston from 'winston' ;
1515import GenerationRequest from '../../common/model/GenerationRequest' ;
16+ import stream from 'stream' ;
1617
1718interface GenerateIpldCarOutput {
1819 DataCid :string
@@ -37,33 +38,36 @@ export async function generateDag (logger: winston.Logger, found: ScanningReques
3738 encoding : 'utf8' ,
3839 maxBuffer : config . getOrDefault ( 'deal_preparation_worker.max_buffer' , 1024 * 1024 * 1024 )
3940 } ) ;
40- // Start streaming all the files to generate-ipld-car
41- for ( const generationRequest of await Datastore . GenerationRequestModel . find (
42- { datasetId : found . id , status : 'completed' } ,
43- null ,
44- { sort : { index : 1 } } ) ) {
45- const generationId = generationRequest . id ;
46- for ( const outputFileList of await Datastore . OutputFileListModel . find (
47- { generationId } ,
41+ const stdinGenerator = async function * ( ) {
42+ // Start streaming all the files to generate-ipld-car
43+ for ( const generationRequest of await Datastore . GenerationRequestModel . find (
44+ { datasetId : found . id , status : 'completed' } ,
4845 null ,
4946 { sort : { index : 1 } } ) ) {
50- for ( const fileInfo of outputFileList . generatedFileList ) {
51- if ( fileInfo . dir ) {
52- continue ;
47+ const generationId = generationRequest . id ;
48+ for ( const outputFileList of await Datastore . OutputFileListModel . find (
49+ { generationId } ,
50+ null ,
51+ { sort : { index : 1 } } ) ) {
52+ for ( const fileInfo of outputFileList . generatedFileList ) {
53+ if ( fileInfo . dir ) {
54+ continue ;
55+ }
56+ const row = {
57+ Path : fileInfo . path ,
58+ Size : fileInfo . size ,
59+ Start : fileInfo . start || 0 ,
60+ End : fileInfo . end || fileInfo . size ,
61+ Cid : fileInfo . cid
62+ } ;
63+ const rowString = JSON . stringify ( row ) + '\n' ;
64+ yield rowString ;
5365 }
54- const row = {
55- Path : fileInfo . path ,
56- Size : fileInfo . size ,
57- Start : fileInfo . start || 0 ,
58- End : fileInfo . end || fileInfo . size ,
59- Cid : fileInfo . cid
60- } ;
61- const rowString = JSON . stringify ( row ) + '\n' ;
62- child . stdin ! . write ( rowString ) ;
6366 }
6467 }
65- }
66- child . stdin ! . end ( ) ;
68+ } ;
69+ const stdinReadable = stream . Readable . from ( stdinGenerator ( ) , { encoding : 'utf8' } ) ;
70+ stdinReadable . pipe ( child . stdin ! ) ;
6771
6872 let output : Output ;
6973 try {
0 commit comments