-
Notifications
You must be signed in to change notification settings - Fork 77
/
Copy pathcparser_util.py
152 lines (125 loc) · 4 KB
/
cparser_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# -*- coding: utf-8 -*-
"""
Python utility functions that wrap the C parser.
"""
import io
from typing import Any
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from .cparser import Error as ParserError
from .cparser import Parser
from .dialect import SimpleDialect
from .exceptions import Error
_FIELD_SIZE_LIMIT: int = 128 * 1024
def field_size_limit(*args: Any, **kwargs: Any) -> int:
"""Get/Set the limit to the field size.
This function is adapted from the one in the Python CSV module. See the
documentation there.
"""
global _FIELD_SIZE_LIMIT
old_limit = _FIELD_SIZE_LIMIT
all_args = list(args) + list(kwargs.values())
if not 0 <= len(all_args) <= 1:
raise TypeError(
"field_size_limit expected at most 1 arguments, got %i"
% len(all_args)
)
if len(all_args) == 0:
return old_limit
limit = all_args[0]
if not isinstance(limit, int):
raise TypeError("limit must be an integer")
_FIELD_SIZE_LIMIT = int(limit)
return old_limit
def _parse_data(
data: Iterable[str],
delimiter: str,
quotechar: str,
escapechar: str,
strict: bool,
return_quoted: bool = False,
) -> Iterator[Union[List[str], List[Tuple[str, bool]]]]:
parser = Parser(
data,
delimiter=delimiter,
quotechar=quotechar,
escapechar=escapechar,
field_limit=field_size_limit(),
strict=strict,
return_quoted=return_quoted,
)
try:
for row in parser:
yield row
except ParserError as e:
raise Error(str(e))
def parse_data(
data: Iterable[str],
dialect: Optional[SimpleDialect] = None,
delimiter: Optional[str] = None,
quotechar: Optional[str] = None,
escapechar: Optional[str] = None,
strict: Optional[bool] = None,
return_quoted: bool = False,
) -> Iterator[Union[List[str], List[Tuple[str, bool]]]]:
"""Parse the data given a dialect using the C parser
Parameters
----------
data : iterable
The data of the CSV file as an iterable
dialect : SimpleDialect
The dialect to use for the parsing. If None, the dialect with each
component set to the empty string is used.
delimiter : str
The delimiter to use. If not None, overwrites the delimiter in the
dialect.
quotechar : str
The quote character to use. If not None, overwrites the quote character
in the dialect.
escapechar : str
The escape character to use. If not None, overwrites the escape
character in the dialect.
strict : bool
Enable strict mode or not. If not None, overwrites the strict mode set
in the dialect.
return_quoted : bool
For each cell, return a tuple "(field, is_quoted)" where the second
element indicates whether the cell was a quoted cell or not.
Yields
------
rows : list
The rows of the file as a list of cells.
Raises
------
Error : clevercsv.exceptions.Error
When an error occurs during parsing.
"""
if dialect is None:
dialect = SimpleDialect("", "", "")
delimiter_ = delimiter if delimiter is not None else dialect.delimiter
quotechar_ = quotechar if quotechar is not None else dialect.quotechar
escapechar_ = escapechar if escapechar is not None else dialect.escapechar
strict_ = strict if strict is not None else dialect.strict
yield from _parse_data(
data,
delimiter_,
quotechar_,
escapechar_,
strict_,
return_quoted=return_quoted,
)
def parse_string(
data: str,
dialect: SimpleDialect,
return_quoted: bool = False,
) -> Iterator[Union[List[str], List[Tuple[str, bool]]]]:
"""Utility for when the CSV file is encoded as a single string"""
return parse_data(
iter(io.StringIO(data, newline="")),
dialect=dialect,
return_quoted=return_quoted,
)