From 5fcb2af82c01199540ceb93bcfe74bb2418b0d3a Mon Sep 17 00:00:00 2001 From: litian han Date: Wed, 19 Nov 2025 19:29:03 +0800 Subject: [PATCH 1/2] Enable zstd compression format --- .gitignore | 4 +- Makefile | 6 +-- README.md | 2 + src/fastqreader.cpp | 121 +++++++++++++++++++++++++++++++++++++++++--- src/fastqreader.h | 7 +++ testdata/R1.fq.zst | Bin 0 -> 346 bytes testdata/R2.fq.zst | Bin 0 -> 415 bytes 7 files changed, 130 insertions(+), 10 deletions(-) create mode 100644 testdata/R1.fq.zst create mode 100644 testdata/R2.fq.zst diff --git a/.gitignore b/.gitignore index 79bdd61e..b6184e1a 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,6 @@ fastp # Test Output *.json -*.html \ No newline at end of file +*.html + +out/ \ No newline at end of file diff --git a/Makefile b/Makefile index 3896861a..98689a25 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,8 @@ DIR_OBJ := ./obj PREFIX ?= /usr/local BINDIR ?= $(PREFIX)/bin -INCLUDE_DIRS ?= -LIBRARY_DIRS ?= +INCLUDE_DIRS ?= /home/zhangkaiLab/hanlitian/micromamba/envs/py311/include +LIBRARY_DIRS ?= /home/zhangkaiLab/hanlitian/micromamba/envs/py311/lib SRC := $(wildcard ${DIR_SRC}/*.cpp) OBJ := $(patsubst %.cpp,${DIR_OBJ}/%.o,$(notdir ${SRC})) @@ -16,7 +16,7 @@ BIN_TARGET := ${TARGET} CXX ?= g++ CXXFLAGS := -std=c++11 -pthread -g -O3 -MD -MP -I${DIR_INC} $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) ${CXXFLAGS} -LIBS := -lisal -ldeflate -lpthread +LIBS := -lisal -ldeflate -lzstd -lpthread STATIC_FLAGS := -static -Wl,--no-as-needed -pthread LD_FLAGS := $(foreach librarydir,$(LIBRARY_DIRS),-L$(librarydir)) $(LIBS) $(LD_FLAGS) STATIC_LD_FLAGS := $(foreach librarydir,$(LIBRARY_DIRS),-L$(librarydir)) $(STATIC_FLAGS) $(LIBS) $(STATIC_LD_FLAGS) diff --git a/README.md b/README.md index 39bc6820..ce2e04b2 100644 --- a/README.md +++ b/README.md @@ -73,6 +73,7 @@ fastp supports batch processing of multiple FASTQ files in a folder, see - [batc 11. support long reads (data from PacBio / Nanopore devices). 12. support reading from STDIN and writing to STDOUT 13. support interleaved input +14. support reading Zstandard-compressed FASTQ/FASTA files (`.zst` / `.zstd`) 14. support ultra-fast FASTQ-level deduplication 15. ... @@ -158,6 +159,7 @@ sudo make install * for PE data, you should also specify read2 input by `-I` or `--in2`, and specify read2 output by `-O` or `--out2`. * if you don't specify the output file names, no output files will be written, but the QC will still be done for both data before and after filtering. * the output will be gzip-compressed if its file name ends with `.gz` +* the input can be gzip-compressed (`.gz`) or Zstandard-compressed (`.zst`, `.zstd`); compression is auto-detected from the extension ## output to STDOUT `fastp` supports streaming the passing-filter reads to STDOUT, so that it can be passed to other compressors like `bzip2`, or be passed to aligners like `bwa` and `bowtie2`. * specify `--stdout` to enable this mode to stream output to STDOUT diff --git a/src/fastqreader.cpp b/src/fastqreader.cpp index 142be7fb..9af55761 100644 --- a/src/fastqreader.cpp +++ b/src/fastqreader.cpp @@ -50,6 +50,13 @@ FastqReader::FastqReader(string filename, bool hasQuality, bool phred64){ mHasNoLineBreakAtEnd = false; mGzipInputUsedBytes = 0; mReadPool = NULL; + mUseZstd = false; + mZstdFinished = false; + mZstdStream = NULL; + mZstdInput.src = NULL; + mZstdInput.size = 0; + mZstdInput.pos = 0; + mZstdInputUsedBytes = 0; init(); } @@ -57,6 +64,10 @@ FastqReader::~FastqReader(){ close(); delete[] mFastqBuf; delete[] mGzipInputBuffer; + if(mZstdStream){ + ZSTD_freeDStream(mZstdStream); + mZstdStream = NULL; + } } bool FastqReader::hasNoLineBreakAtEnd() { @@ -69,11 +80,13 @@ void FastqReader::setReadPool(ReadPool* rp) { bool FastqReader::bufferFinished() { - if(mZipped) { - return eof() && mGzipState.avail_in == 0; - } else { + if(!mZipped) return eof(); - } + + if(mUseZstd) + return mZstdFinished && mZstdInput.pos == mZstdInput.size; + + return eof() && mGzipState.avail_in == 0; } void FastqReader::readToBufIgzip(){ @@ -139,10 +152,67 @@ void FastqReader::readToBufIgzip(){ } } +void FastqReader::readToBufZstd(){ + mBufDataLen = 0; + if(mZstdFinished) + return; + + ZSTD_outBuffer outBuf; + outBuf.dst = mFastqBuf; + outBuf.size = mGzipOutputBufferSize; + outBuf.pos = 0; + + while(outBuf.pos == 0){ + if(mZstdInput.pos == mZstdInput.size){ + size_t readBytes = fread(mGzipInputBuffer, 1, mGzipInputBufferSize, mFile); + if(readBytes == 0){ + if(eof()){ + mZstdFinished = true; + break; + } else { + error_exit("zstd: read error on file: " + mFilename); + } + } + mZstdInput.src = mGzipInputBuffer; + mZstdInput.size = readBytes; + mZstdInput.pos = 0; + mZstdInputUsedBytes += readBytes; + } + + size_t ret = ZSTD_decompressStream(mZstdStream, &outBuf, &mZstdInput); + if(ZSTD_isError(ret)){ + error_exit("zstd: decompression error for file: " + mFilename + ", error: " + string(ZSTD_getErrorName(ret))); + } + + if(ret == 0){ + if(mZstdInput.pos < mZstdInput.size || !eof()){ + size_t resetRet = ZSTD_initDStream(mZstdStream); + if(ZSTD_isError(resetRet)){ + error_exit("zstd: failed to reset stream for file: " + mFilename + ", error: " + string(ZSTD_getErrorName(resetRet))); + } + } else { + mZstdFinished = true; + } + } + + if(eof() && mZstdInput.pos == mZstdInput.size && ret != 0){ + error_exit("zstd: unexpected eof found in file: " + mFilename); + } + + if(mZstdFinished || outBuf.pos > 0) + break; + } + + mBufDataLen = outBuf.pos; +} + void FastqReader::readToBuf() { mBufDataLen = 0; if(mZipped) { - readToBufIgzip(); + if(mUseZstd) + readToBufZstd(); + else + readToBufIgzip(); } else { if(!eof()) mBufDataLen = fread(mFastqBuf, 1, FQ_BUF_SIZE, mFile); @@ -173,6 +243,26 @@ void FastqReader::init(){ } mZipped = true; } + else if (ends_with(mFilename, ".zst") || ends_with(mFilename, ".zstd")){ + mFile = fopen(mFilename.c_str(), "rb"); + if(mFile == NULL) { + error_exit("Failed to open file: " + mFilename); + } + mZstdStream = ZSTD_createDStream(); + if(mZstdStream == NULL) { + error_exit("zstd: failed to allocate decompressor for file: " + mFilename); + } + size_t ret = ZSTD_initDStream(mZstdStream); + if(ZSTD_isError(ret)){ + error_exit("zstd: failed to init decompressor for file: " + mFilename + ", error: " + string(ZSTD_getErrorName(ret))); + } + mZipped = true; + mUseZstd = true; + mZstdFinished = false; + mZstdInput.src = mGzipInputBuffer; + mZstdInput.size = 0; + mZstdInput.pos = 0; + } else { if(mFilename == "/dev/stdin") { mFile = stdin; @@ -189,7 +279,10 @@ void FastqReader::init(){ void FastqReader::getBytes(size_t& bytesRead, size_t& bytesTotal) { if(mZipped) { - bytesRead = mGzipInputUsedBytes - mGzipState.avail_in; + if(mUseZstd) + bytesRead = mZstdInputUsedBytes - (mZstdInput.size - mZstdInput.pos); + else + bytesRead = mGzipInputUsedBytes - mGzipState.avail_in; } else { bytesRead = ftell(mFile);//mFile.tellg(); } @@ -362,6 +455,22 @@ bool FastqReader::isZipFastq(string filename) { return true; else if (ends_with(filename, ".fa.gz")) return true; + else if (ends_with(filename, ".fastq.zst")) + return true; + else if (ends_with(filename, ".fq.zst")) + return true; + else if (ends_with(filename, ".fastq.zstd")) + return true; + else if (ends_with(filename, ".fq.zstd")) + return true; + else if (ends_with(filename, ".fasta.zst")) + return true; + else if (ends_with(filename, ".fa.zst")) + return true; + else if (ends_with(filename, ".fasta.zstd")) + return true; + else if (ends_with(filename, ".fa.zstd")) + return true; else return false; } diff --git a/src/fastqreader.h b/src/fastqreader.h index fe65dca3..866430f2 100644 --- a/src/fastqreader.h +++ b/src/fastqreader.h @@ -32,6 +32,7 @@ SOFTWARE. #include #include #include "igzip_lib.h" +#include #include "readpool.h" class FastqReader{ @@ -61,6 +62,7 @@ class FastqReader{ void clearLineBreaks(char* line); void readToBuf(); void readToBufIgzip(); + void readToBufZstd(); bool bufferFinished(); private: @@ -83,6 +85,11 @@ class FastqReader{ bool mHasQuality; bool mPhred64; ReadPool* mReadPool; + bool mUseZstd; + bool mZstdFinished; + ZSTD_DStream* mZstdStream; + ZSTD_inBuffer mZstdInput; + size_t mZstdInputUsedBytes; }; diff --git a/testdata/R1.fq.zst b/testdata/R1.fq.zst new file mode 100644 index 0000000000000000000000000000000000000000..7f92096ce4f40b0e6a0e04299b716001a30f95fe GIT binary patch literal 346 zcmV-g0j2&ZwJ-f-;RNa^mY3D2fCj2SSuPEY00O7#anDL69(rq9}~QXkeH@LW05pf-ztWfg*uG5G+^_ z+pelD>9#8Msnp81ma6WXZd=n`)w}K`)qOoq>sL-mzAe3^q`9ZAtK@TgZEj91zMDgfK6 zTHF0n^7mi0bt|WO9a_l08NLI7Y{v%8*G&W)hPqj)|EWqADT)p*8nbx3uIW{i^=ko^DE8 zD(^JiRPVl~d`|gxQuvOLzgs*?VFboifk|9&dl$p^|NU@YNqhyL&h%?9P zQ>m42E!}k6nmU)%w@uxeR%)xt4TNeLi^OtR#wldxQlRlbrkFr15D`*PP?&TuN Date: Wed, 19 Nov 2025 20:01:12 +0800 Subject: [PATCH 2/2] minor --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 98689a25..ecfe7bd9 100644 --- a/Makefile +++ b/Makefile @@ -4,8 +4,8 @@ DIR_OBJ := ./obj PREFIX ?= /usr/local BINDIR ?= $(PREFIX)/bin -INCLUDE_DIRS ?= /home/zhangkaiLab/hanlitian/micromamba/envs/py311/include -LIBRARY_DIRS ?= /home/zhangkaiLab/hanlitian/micromamba/envs/py311/lib +INCLUDE_DIRS ?= +LIBRARY_DIRS ?= SRC := $(wildcard ${DIR_SRC}/*.cpp) OBJ := $(patsubst %.cpp,${DIR_OBJ}/%.o,$(notdir ${SRC}))