-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathperiph.py
328 lines (264 loc) · 11.2 KB
/
periph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
from nmigen import *
from nmigen import tracer
from .bus import *
__all__ = ["Peripheral", "IRQLine", "Event", "PeripheralBridge"]
class Peripheral:
"""CSR-capable peripheral.
A helper class to reduce the boilerplate needed to control a peripheral with a CSR interface.
It provides facilities for instantiating CSR registers and sending interrupt requests to the
CPU.
The ``Peripheral`` class is not meant to be instantiated as-is, but rather as a base class for
actual peripherals.
Usage example
-------------
```
class ExamplePeripheral(csr.Peripheral, Elaboratable):
def __init__(self):
super().__init__()
self._data = self.csr(8, "w")
self._rdy = self.event(mode="rise")
self._bridge = self.csr_bridge()
self.csr_bus = self._bridge.bus
self.irq = self._bridge.irq
def elaborate(self, platform):
m = Module()
m.submodules.bridge = self._bridge
# ...
return m
```
Parameters
----------
name : str
Name of this peripheral. If ``None`` (default) the name is inferred from the variable
name this peripheral is assigned to.
Attributes
----------
name : str
Name of the peripheral.
"""
def __init__(self, name=None, src_loc_at=1):
if name is not None and not isinstance(name, str):
raise TypeError("Name must be a string, not {!r}".format(name))
self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
self._csr_regs = []
self._csr_bus = None
self._events = []
self._irq = None
@property
def csr_bus(self):
"""CSR bus providing access to registers.
Return value
------------
An instance of :class:`Interface`.
Exceptions
----------
Raises :exn:`NotImplementedError` if the peripheral does not have a CSR bus.
"""
if self._csr_bus is None:
raise NotImplementedError("Peripheral {!r} does not have a CSR bus interface"
.format(self))
return self._csr_bus
@csr_bus.setter
def csr_bus(self, csr_bus):
if not isinstance(csr_bus, Interface):
raise TypeError("CSR bus interface must be an instance of csr.Interface, not {!r}"
.format(csr_bus))
self._csr_bus = csr_bus
@property
def irq(self):
"""Interrupt request line.
Return value
------------
An instance of :class:`IRQLine`.
Exceptions
----------
Raises :exn:`NotImplementedError` if the peripheral does not have an IRQ line.
"""
if self._irq is None:
raise NotImplementedError("Peripheral {!r} does not have an IRQ line"
.format(self))
return self._irq
@irq.setter
def irq(self, irq):
if not isinstance(irq, IRQLine):
raise TypeError("IRQ line must be an instance of IRQLine, not {!r}"
.format(irq))
self._irq = irq
def csr(self, width, access, *, addr=None, alignment=0, name=None, src_loc_at=0):
"""Request a CSR register.
Parameters
----------
width : int
Width of the register. See :class:`Element`.
access : :class:`Access`
Register access mode. See :class:`Element`.
addr : int
Address of the register. See :meth:`Multiplexer.add`.
alignment : int
Register alignment. See :class:`Multiplexer`.
name : str
Name of the register. If ``None`` (default) the name is inferred from the variable
name this register is assigned to.
Return value
------------
An instance of :class:`Element`.
"""
if name is not None and not isinstance(name, str):
raise TypeError("Name must be a string, not {!r}".format(name))
elem_name = name or tracer.get_var_name(depth=2 + src_loc_at)
elem = Element(width, access, name="{}_{}".format(self.name, elem_name))
self._csr_regs.append((elem, addr, alignment))
return elem
def event(self, *, mode="level", name=None, src_loc_at=0):
"""Request an event source.
See :class:`Event` for details.
Return value
------------
An instance of :class:`Event`.
"""
event = Event(mode=mode, name=name, src_loc_at=1 + src_loc_at)
self._events.append(event)
return event
def csr_bridge(self, *, data_width=8, alignment=0):
"""Request a bridge to the resources of the peripheral.
See :class:`PeripheralBridge` for details.
Return value
------------
An instance of :class:`PeripheralBridge` providing access to the registers
of the peripheral and managing its event sources.
"""
return PeripheralBridge(self, data_width=data_width, alignment=alignment)
def csr_registers(self):
"""Iterate requested CSR registers and their parameters.
Yield values
------------
A tuple ``elem, addr, alignment`` describing the register and its parameters.
"""
for elem, addr, alignment in self._csr_regs:
yield elem, addr, alignment
def events(self):
"""Iterate requested event sources.
Event sources are ordered by request order.
Yield values
------------
An instance of :class:`Event`.
"""
for event in self._events:
yield event
class IRQLine(Signal):
"""Interrupt request line."""
def __init__(self, *, name=None, src_loc_at=0):
super().__init__(name=name, src_loc_at=1 + src_loc_at)
class Event:
"""Event source.
Parameters
----------
mode : ``"level"``, ``"rise"``, ``"fall"``
Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high.
If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge
of ``stb``.
name : str
Name of the event. If ``None`` (default) the name is inferred from the variable
name this event source is assigned to.
Attributes
----------
name : str
Name of the event
mode : ``"level"``, ``"rise"``, ``"fall"``
Trigger mode.
stb : Signal, in
Event strobe.
"""
def __init__(self, *, mode, name=None, src_loc_at=0):
if name is not None and not isinstance(name, str):
raise TypeError("Name must be a string, not {!r}".format(name))
choices = ("level", "rise", "fall")
if mode not in choices:
raise ValueError("Invalid trigger mode {!r}; must be one of {}"
.format(mode, ", ".join(choices)))
self.name = name or tracer.get_var_name(depth=2 + src_loc_at)
self.mode = mode
self.stb = Signal(name="{}_stb".format(self.name))
class PeripheralBridge(Elaboratable):
"""Peripheral bridge.
A bridge providing access to the registers of a peripheral, and support for interrupt
requests (IRQs) from its event sources.
CSR registers
-------------
ev_status : read-only
Event status. Each bit displays the value of the ``stb`` signal of an event source.
The register width is ``len(list(periph.events())`` bits. Event sources are ordered by
request order.
ev_pending : read/write
Event pending. Each bit displays whether an event source has a pending notification.
Writing 1 to a bit clears the notification.
The register width is ``len(list(periph.events())`` bits. Event sources are ordered by
request order.
ev_enable : read/write
Event enable. Writing 1 to a bit enables an event source. Writing 0 disables it.
The register width is ``len(list(periph.events())`` bits. Event sources are ordered by
request order.
Parameters
----------
periph : :class:`Peripheral`
The peripheral whose resources are exposed by this bridge.
data_width : int
Data width of the CSR bus. See :class:`Multiplexer`.
alignment : int
Register alignment. See :class:`Multiplexer`.
Attributes
----------
bus : :class:`Interface`
CSR bus providing access to the registers of the peripheral.
irq : :class:`IRQLine` or None
IRQ line providing notifications from local events to the CPU. It is raised if any
event source is both enabled and has a pending notification. If the peripheral has
no event sources, it is set to ``None``.
"""
def __init__(self, periph, *, data_width, alignment):
if not isinstance(periph, Peripheral):
raise TypeError("Peripheral must be an instance of Peripheral, not {!r}"
.format(periph))
self._mux = Multiplexer(addr_width=1, data_width=data_width, alignment=alignment)
for elem, elem_addr, elem_alignment in periph.csr_registers():
self._mux.add(elem, addr=elem_addr, alignment=elem_alignment, extend=True)
self._events = list(periph.events())
if len(self._events) > 0:
width = len(self._events)
self._ev_status = Element(width, "r", name="{}_ev_status".format(periph.name))
self._ev_pending = Element(width, "rw", name="{}_ev_pending".format(periph.name))
self._ev_enable = Element(width, "rw", name="{}_ev_enable".format(periph.name))
self._mux.add(self._ev_status, extend=True)
self._mux.add(self._ev_pending, extend=True)
self._mux.add(self._ev_enable, extend=True)
self.irq = IRQLine(name="{}_irq".format(periph.name))
else:
self.irq = None
self.bus = self._mux.bus
def elaborate(self, platform):
m = Module()
m.submodules.mux = self._mux
if self.irq is not None:
with m.If(self._ev_pending.w_stb):
m.d.sync += self._ev_pending.r_data.eq( self._ev_pending.r_data
& ~self._ev_pending.w_data)
with m.If(self._ev_enable.w_stb):
m.d.sync += self._ev_enable.r_data.eq(self._ev_enable.w_data)
for i, ev in enumerate(self._events):
m.d.sync += self._ev_status.r_data[i].eq(ev.stb)
if ev.mode in ("rise", "fall"):
ev_stb_r = Signal.like(ev.stb, name_suffix="_r")
m.d.sync += ev_stb_r.eq(ev.stb)
ev_trigger = Signal(name="{}_trigger".format(ev.name))
if ev.mode == "level":
m.d.comb += ev_trigger.eq(ev.stb)
elif ev.mode == "rise":
m.d.comb += ev_trigger.eq(~ev_stb_r & ev.stb)
elif ev.mode == "fall":
m.d.comb += ev_trigger.eq( ev_stb_r & ~ev.stb)
else:
assert False # :nocov:
with m.If(ev_trigger):
m.d.sync += self._ev_pending.r_data[i].eq(1)
m.d.comb += self.irq.eq((self._ev_pending.r_data & self._ev_enable.r_data).any())
return m