-
Notifications
You must be signed in to change notification settings - Fork 25
Preprocessor for ULP/RTC macros #43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
79db90f
84d734d
ec81ecc
c184924
25d34b0
76a81ac
56f4530
9907b10
27ab850
54b117e
feb42dc
87507c9
99352a3
d76fd26
4dded94
219f939
5c3eeb8
3e8c0d5
ac1de99
8d88fd1
46f1442
2f6ee78
69ae946
4f90f76
d44384f
254adf9
c3bd101
2a0a39a
47d5e8a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
import os | ||
import btree | ||
from .util import file_exists | ||
|
||
DBNAME = 'defines.db' | ||
|
||
|
||
class DefinesDB: | ||
def __init__(self): | ||
self._file = None | ||
self._db = None | ||
self._db_exists = None | ||
|
||
def clear(self): | ||
self.close() | ||
try: | ||
os.remove(DBNAME) | ||
self._db_exists = False | ||
except OSError: | ||
pass | ||
|
||
def open(self): | ||
if self._db: | ||
return | ||
try: | ||
self._file = open(DBNAME, 'r+b') | ||
except OSError: | ||
self._file = open(DBNAME, 'w+b') | ||
self._db = btree.open(self._file) | ||
self._db_exists = True | ||
|
||
def close(self): | ||
if not self._db: | ||
return | ||
self._db.close() | ||
self._db = None | ||
self._file.close() | ||
self._file = None | ||
|
||
def db_exists(self): | ||
if self._db_exists is None: | ||
self._db_exists = file_exists(DBNAME) | ||
return self._db_exists | ||
|
||
def update(self, dictionary): | ||
for k, v in dictionary.items(): | ||
self.__setitem__(k, v) | ||
|
||
def get(self, key, default): | ||
try: | ||
result = self.__getitem__(key) | ||
except KeyError: | ||
result = default | ||
return result | ||
|
||
def keys(self): | ||
if not self.db_exists(): | ||
return [] | ||
|
||
self.open() | ||
return [k.decode() for k in self._db.keys()] | ||
|
||
def __getitem__(self, key): | ||
if not self.db_exists(): | ||
raise KeyError | ||
|
||
self.open() | ||
return self._db[key.encode()].decode() | ||
|
||
def __setitem__(self, key, value): | ||
self.open() | ||
self._db[key.encode()] = str(value).encode() | ||
|
||
def __iter__(self): | ||
return iter(self.keys()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
from . import nocomment | ||
from .util import split_tokens | ||
from .definesdb import DefinesDB | ||
|
||
|
||
class RTC_Macros: | ||
|
@@ -56,6 +57,7 @@ def parse_define_line(self, line): | |
def parse_defines(self, content): | ||
for line in content.splitlines(): | ||
wnienhaus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self._defines.update(self.parse_define_line(line)) | ||
|
||
return self._defines | ||
|
||
def expand_defines(self, line): | ||
|
@@ -66,21 +68,22 @@ def expand_defines(self, line): | |
line = "" | ||
for t in tokens: | ||
lu = self._defines.get(t, t) | ||
if lu == t and self._defines_db: | ||
lu = self._defines_db.get(t, t) | ||
if lu != t: | ||
found = True | ||
line += lu | ||
|
||
return line | ||
|
||
def process_include_file(self, filename): | ||
defines = self._defines | ||
|
||
with open(filename, 'r') as f: | ||
for line in f: | ||
result = self.parse_defines(line) | ||
defines.update(result) | ||
with self.open_db() as db: | ||
with open(filename, 'r') as f: | ||
for line in f: | ||
result = self.parse_define_line(line) | ||
db.update(result) | ||
|
||
return defines | ||
return db | ||
|
||
def expand_rtc_macros(self, line): | ||
clean_line = line.strip() | ||
|
@@ -103,17 +106,43 @@ def expand_rtc_macros(self, line): | |
|
||
return macro_fn(*macro_args) | ||
|
||
def use_db(self, defines_db): | ||
self._defines_db = defines_db | ||
|
||
def open_db(self): | ||
class ctx: | ||
def __init__(self, db): | ||
self._db = db | ||
|
||
def __enter__(self): | ||
# not opening DefinesDB - it opens itself when needed | ||
return self._db | ||
|
||
def __exit__(self, type, value, traceback): | ||
if isinstance(self._db, DefinesDB): | ||
self._db.close() | ||
|
||
if self._defines_db: | ||
return ctx(self._defines_db) | ||
|
||
return ctx(self._defines) | ||
Comment on lines
+121
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did you try having the contextmanager inside DefinesDB class? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if it works, also call the context manager from the tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have tried to put the context manager into the DefinesDB class, but because it handles both the "with DefinesDB" and the "without a DefinesDB" case, this doesn't fit into the DefinesDB class. I could find an elegant way to add a condition to the "with" statement - ie. only use the context manager from DefinesDB when we're actually using the DefinesDB, otherwise work with the class-internal dict. I'll mull over this a bit more and see if I can find a way. Regarding testing: in effect this was tested - because the existing tests exercise the code which uses the context manager, but I added another test now to check specifically that the db is actually closed too. |
||
|
||
def preprocess(self, content): | ||
self.parse_defines(content) | ||
lines = nocomment.remove_comments(content) | ||
result = [] | ||
for line in lines: | ||
line = self.expand_defines(line) | ||
line = self.expand_rtc_macros(line) | ||
result.append(line) | ||
result = "\n".join(result) | ||
|
||
with self.open_db(): | ||
lines = nocomment.remove_comments(content) | ||
result = [] | ||
for line in lines: | ||
line = self.expand_defines(line) | ||
line = self.expand_rtc_macros(line) | ||
result.append(line) | ||
result = "\n".join(result) | ||
|
||
return result | ||
|
||
|
||
def preprocess(content): | ||
return Preprocessor().preprocess(content) | ||
def preprocess(content, use_defines_db=True): | ||
preprocessor = Preprocessor() | ||
preprocessor.use_db(DefinesDB()) | ||
return preprocessor.preprocess(content) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import os | ||
|
||
from esp32_ulp.definesdb import DefinesDB, DBNAME | ||
from esp32_ulp.util import file_exists | ||
|
||
tests = [] | ||
|
||
|
||
def test(param): | ||
tests.append(param) | ||
|
||
|
||
@test | ||
def test_definesdb_clear_removes_all_keys(): | ||
db = DefinesDB() | ||
db.open() | ||
db.update({'KEY1': 'VALUE1'}) | ||
|
||
db.clear() | ||
|
||
assert 'KEY1' not in db | ||
|
||
db.close() | ||
|
||
|
||
@test | ||
def test_definesdb_persists_data_across_instantiations(): | ||
db = DefinesDB() | ||
db.open() | ||
db.clear() | ||
|
||
db.update({'KEY1': 'VALUE1'}) | ||
|
||
assert 'KEY1' in db | ||
|
||
db.close() | ||
del db | ||
db = DefinesDB() | ||
db.open() | ||
|
||
assert db.get('KEY1', None) == 'VALUE1' | ||
|
||
db.close() | ||
|
||
|
||
@test | ||
def test_definesdb_should_not_create_a_db_file_when_only_reading(): | ||
db = DefinesDB() | ||
|
||
db.clear() | ||
assert not file_exists(DBNAME) | ||
|
||
assert db.get('some-key', None) is None | ||
assert not file_exists(DBNAME) | ||
|
||
|
||
if __name__ == '__main__': | ||
# run all methods marked with @test | ||
for t in tests: | ||
t() |
Uh oh!
There was an error while loading. Please reload this page.