1717DEFAULT_DUMMY_QUAL = "I"
1818DEFAULT_COLUMNS = {k : k for k in ["sequence_id" , "sequence" , "sequence_quality" , "sequence_description" ]}
1919
20+ # Mapping from file extensions to file format names used here
21+ FMT_EXT_MAP = {
22+ ".csv" : "csv" ,
23+ ".tsv" : "tsv" ,
24+ ".tab" : "tsv" ,
25+ ".fa" : "fa" ,
26+ ".fasta" : "fa" ,
27+ ".afa" : "fa" ,
28+ ".fq" : "fq" ,
29+ ".fastq" : "fq" }
30+
31+ # Mapping from file format names to functions that take file handles and give
32+ # format-specific record iterators.
33+ READERS = {
34+ "csv" : lambda hndl : csv .DictReader (hndl ),
35+ "tsv" : lambda hndl : csv .DictReader (hndl , delimiter = "\t " ),
36+ "fa" : lambda hndl : SeqIO .parse (hndl , "fasta" ),
37+ "fq" : lambda hndl : SeqIO .parse (hndl , "fastq" )}
38+
2039class RecordHandler :
2140 """Abstract class for shared generic record reading and writing
2241 functionality. This behaves as a context manager to handle file opening
@@ -63,28 +82,7 @@ def close(self):
6382
6483 def infer_fmt (self , fmt = None ):
6584 """Guess file format from our path, with optional default."""
66- try :
67- path = Path (self .pathlike )
68- except TypeError :
69- path = Path (str (self .pathlike .name ))
70- fmt_inferred = None
71- if path .suffix .lower () in [".csv" ]:
72- fmt_inferred = "csv"
73- elif path .suffix .lower () in [".tsv" , ".tab" ]:
74- fmt_inferred = "tsv"
75- elif path .suffix .lower () in [".fa" , ".fasta" ]:
76- fmt_inferred = "fa"
77- elif path .suffix .lower () in [".fq" , ".fastq" ]:
78- fmt_inferred = "fq"
79- elif path .suffix .lower () == ".gz" :
80- if Path (path .stem ).suffix .lower () in [".fa" , ".fasta" ]:
81- fmt_inferred = "fagz"
82- elif Path (path .stem ).suffix .lower () in [".fq" , ".fastq" ]:
83- fmt_inferred = "fqgz"
84- elif Path (path .stem ).suffix .lower () in [".csv" ]:
85- fmt_inferred = "csvgz"
86- elif Path (path .stem ).suffix .lower () in [".tsv" , ".tab" ]:
87- fmt_inferred = "tsvgz"
85+ fmt_inferred = self ._infer_fmt (self .pathlike )
8886 LOGGER .info ("given format: %s" , fmt )
8987 LOGGER .info ("inferred format: %s" , fmt_inferred )
9088 if not fmt :
@@ -95,6 +93,20 @@ def infer_fmt(self, fmt=None):
9593 fmt = fmt_inferred
9694 return fmt
9795
96+ @staticmethod
97+ def _infer_fmt (path ):
98+ try :
99+ path = Path (path )
100+ except TypeError :
101+ path = Path (str (path .name ))
102+ ext = path .suffix .lower ()
103+ ext2 = Path (path .stem ).suffix .lower ()
104+ fmt2 = FMT_EXT_MAP .get (ext2 )
105+ if ext == ".gz" and fmt2 :
106+ return fmt2 + "gz"
107+ else :
108+ return FMT_EXT_MAP .get (ext )
109+
98110 def encode_record (self , record ):
99111 """Convert record dictionary into a SeqRecord object."""
100112 seq = record [self .colmap ["sequence" ]]
@@ -160,14 +172,7 @@ def open(self):
160172 else :
161173 LOGGER .info ("reading from text" )
162174 self .handle = open (self .pathlike , "rt" , encoding = "ascii" )
163- if self .fmt in ["csv" , "csvgz" ]:
164- self .reader = csv .DictReader (self .handle )
165- elif self .fmt in ["tsv" , "tsvgz" ]:
166- self .reader = csv .DictReader (self .handle , delimiter = "\t " )
167- elif self .fmt in ["fa" , "fagz" ]:
168- self .reader = SeqIO .parse (self .handle , "fasta" )
169- elif self .fmt in ["fq" , "fqgz" ]:
170- self .reader = SeqIO .parse (self .handle , "fastq" )
175+ self .reader = READERS [self .fmt .removesuffix ("gz" )](self .handle )
171176
172177 def __iter__ (self ):
173178 return self
0 commit comments