diff --git a/amaranth_orchard/io/rfc_uart.py b/amaranth_orchard/io/rfc_uart.py new file mode 100644 index 0000000..48e1deb --- /dev/null +++ b/amaranth_orchard/io/rfc_uart.py @@ -0,0 +1,592 @@ +""" +The Amaranth SoC RFC UART from https://github.com/ChipFlow/amaranth-orchard +""" + +from amaranth import * +from amaranth.lib import stream, wiring +from amaranth.lib.wiring import In, Out, flipped, connect + +from amaranth_soc import csr + + +__all__ = ["RxPhySignature", "TxPhySignature", "RxPeripheral", "TxPeripheral", "Peripheral"] + + +class RxPhySignature(wiring.Signature): + """Receiver PHY signature. + + Parameters + ---------- + phy_config_shape : :ref:`shape-like ` + Shape of the `config` member of this interface. + symbol_shape : :ref:`shape-like ` + Shape of the `symbols.payload` member of this interface. + + Members + ------- + rst : :py:`Out(1)` + PHY reset. + config : :py:`Out(phy_config_shape)` + PHY configuration word. Its shape is given by the `phy_config_shape` parameter. Its value + must remain constant unless `rst` is high. + symbols : :py:`In(stream.Signature(symbol_shape))` + Symbol stream. The shape of its payload is given by the `symbol_shape` parameter. + overflow : :py:`In(1)` + Overflow flag. Pulsed for one clock cycle if a symbol was received before the previous one + is acknowledged (i.e. before `symbols.ready` is high). + error : :py:`In(1)` + Receiver error flag. Pulsed for one clock cycle in case of an implementation-specific error + (e.g. wrong parity bit). + """ + def __init__(self, phy_config_shape, symbol_shape): + super().__init__({ + "rst": Out(1), + "config": Out(phy_config_shape), + "symbols": In(stream.Signature(symbol_shape)), + "overflow": In(1), + "error": In(1), + }) + + +class TxPhySignature(wiring.Signature): + """Transmitter PHY signature. + + Parameters + ---------- + phy_config_shape : :ref:`shape-like ` + Shape of the `config` member of this interface. + symbol_shape : :ref:`shape-like ` + Shape of the `symbols.payload` member of this interface. + + Members + ------- + rst : :py:`Out(1)` + PHY reset. + config : :py:`Out(phy_config_shape)` + PHY configuration. Its shape is given by the `phy_config_shape` parameter. Its value must + remain constant unless `rst` is high. + symbols : :py:`Out(stream.Signature(symbol_shape))` + Symbol stream. The shape of its payload is given by the `symbol_shape` parameter. + """ + def __init__(self, phy_config_shape, symbol_shape): + super().__init__({ + "rst": Out(1), + "config": Out(phy_config_shape), + "symbols": Out(stream.Signature(symbol_shape)), + }) + + +class _PhyConfigFieldAction(csr.FieldAction): + def __init__(self, shape, *, init=0): + super().__init__(shape, access="rw", members=( + ("data", Out(shape)), + ("w_en", In(unsigned(1))), + )) + self._storage = Signal(shape, init=init) + + def elaborate(self, platform): + m = Module() + + with m.If(self.w_en & self.port.w_stb): + m.d.sync += self._storage.eq(self.port.w_data) + + m.d.comb += [ + self.port.r_data.eq(self._storage), + self.data.eq(self._storage), + ] + + return m + + +class RxPeripheral(wiring.Component): + class Config(csr.Register, access="rw"): + """Peripheral configuration register. + + This :class:`Register` has the following fields: + + .. bitfield:: + :bits: 8 + + [ + { "name": "enable", "bits": 1, "attr": "RW" }, + { "bits": 7, "attr": "ResR0W0" }, + ] + + - If the ``enable`` field is 0, the PHY is held in reset state. + - If the ``enable`` field is 1, the PHY operates normally. + """ + enable: csr.Field(csr.action.RW, 1) + _unimp: csr.Field(csr.action.ResR0W0, 7) + + class PhyConfig(csr.Register, access="rw"): + """PHY configuration register. + + This :class:`Register` is writable if the ``enable`` field of :class:`RxPeripheral.Config` + is 0, and read-only otherwise. It has a single field with an implementation-specific shape + given by the ``phy_config_shape`` parameter. + + If ``phy_config_shape`` is ``unsigned(16)``, then the register has the following field: + + .. bitfield:: + :bits: 16 + + [ + { "name": "phy_config", "bits": 16, "attr": "RW" }, + ] + + Parameters + ---------- + phy_config_shape : :ref:`shape-like ` + Shape of the PHY configuration word. + phy_config_init : :class:`int` + Initial value of the PHY configuration word. + """ + def __init__(self, phy_config_shape, phy_config_init): + super().__init__(csr.Field(_PhyConfigFieldAction, phy_config_shape, + init=phy_config_init)) + + class Status(csr.Register, access="rw"): + """Status register. + + This :class:`Register` is read to be informed of available data or error conditions. + + It has the following fields: + + .. bitfield:: + :bits: 8 + + [ + { "name": "ready", "bits": 1, "attr": "R" }, + { "name": "overflow", "bits": 1, "attr": "RW1C" }, + { "name": "error", "bits": 1, "attr": "RW1C" }, + { "bits": 5, "attr": "ResR0W0" }, + ] + + - The ``ready`` field is set if the receiver buffer is non-empty. + - The ``overflow`` field is set and latched if a symbol is received while the receiver + buffer is full. + - The ``error`` field is set and latched if any implementation-specific error occured. + """ + ready: csr.Field(csr.action.R, 1) + overflow: csr.Field(csr.action.RW1C, 1) + error: csr.Field(csr.action.RW1C, 1) + _unimp: csr.Field(csr.action.ResR0W0, 5) + + class Data(csr.Register, access="r"): + """Data register. + + This :class:`Register` is read to consume data from the receiver buffer. It has a single + field with an implementation-specific shape given by the ``symbol_shape`` parameter. + + If ``symbol_shape`` is ``unsigned(8)``, then the register has the following field: + + .. bitfield:: + :bits: 8 + + [ + { "name": "data", "bits": 8, "attr": "R" }, + ] + + - If either the ``enable`` field of :class:`RxPeripheral.Config` or the ``ready`` field of + :class:`RxPeripheral.Status` are 0, reading from this register has no side-effect and + returns an unspecified value. + - If both the ``enable`` field of :class:`RxPeripheral.Config` and the ``ready`` field of + :class:`RxPeripheral.Status` are 1, reading from this register consumes one symbol from + the receiver buffer and returns it. + + Parameters + ---------- + symbol_shape : :ref:`shape-like ` + Shape of a symbol. + """ + def __init__(self, symbol_shape): + super().__init__(csr.Field(csr.action.R, symbol_shape)) + + """UART receiver peripheral. + + Parameters + ---------- + addr_width : :class:`int` + CSR bus address width. + data_width : :class:`int` + CSR bus data width. + phy_config_shape : :ref:`shape-like ` + Shape of the PHY configuration word. Optional. Defaults to ``unsigned(16)``. + phy_config_init : :class:`int` + Initial value of the PHY configuration word. Optional. Defaults to ``0``. + symbol_shape : :ref:`shape-like ` + Shape of a symbol. Optional. Defaults to ``unsigned(8)``. + + Members + ------- + bus : :py:`In(csr.Signature(addr_width, data_width))` + CSR bus interface providing access to registers. + phy : :py:`Out(RxPhySignature(phy_config_shape, symbol_shape))` + Interface between the peripheral and its PHY. + """ + def __init__(self, *, addr_width, data_width, phy_config_shape=unsigned(16), + phy_config_init=0, symbol_shape=unsigned(8)): + regs = csr.Builder(addr_width=addr_width, data_width=data_width) + + self._config = regs.add("Config", self.Config()) + self._phy_config = regs.add("PhyConfig", self.PhyConfig(phy_config_shape, phy_config_init)) + self._status = regs.add("Status", self.Status()) + self._data = regs.add("Data", self.Data(symbol_shape)) + + self._bridge = csr.Bridge(regs.as_memory_map()) + + super().__init__({ + "bus": In(csr.Signature(addr_width=addr_width, data_width=data_width)), + "phy": Out(RxPhySignature(phy_config_shape, symbol_shape)), + }) + self.bus.memory_map = self._bridge.bus.memory_map + + self._phy_config_shape = phy_config_shape + self._phy_config_init = phy_config_init + self._symbol_shape = symbol_shape + + @property + def phy_config_shape(self): + """Shape of the PHY configuration word. + + Returns + ------- + :ref:`shape-like ` + """ + return self._phy_config_shape + + @property + def phy_config_init(self): + """Initial value of the PHY configuration word. + + Returns + ------- + :class:`int` + """ + return self._phy_config_init + + @property + def symbol_shape(self): + """Shape of a symbol. + + Returns + ------- + :ref:`shape-like ` + """ + return self._symbol_shape + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + + connect(m, flipped(self.bus), self._bridge.bus) + + m.d.comb += [ + self.phy.rst.eq(~self._config.f.enable.data), + + self._phy_config.f.w_en.eq(self.phy.rst), + self.phy.config.eq(self._phy_config.f.data), + + self._status.f.ready.r_data.eq(self.phy.symbols.valid), + self._data.f.r_data.eq(self.phy.symbols.payload), + self.phy.symbols.ready.eq(self._data.f.r_stb), + + self._status.f.overflow.set.eq(self.phy.overflow), + self._status.f.error.set.eq(self.phy.error), + ] + + return m + + +class TxPeripheral(wiring.Component): + class Config(csr.Register, access="rw"): + """Peripheral configuration register. + + This :class:`Register` has the following fields: + + .. bitfield:: + :bits: 8 + + [ + { "name": "enable", "bits": 1, "attr": "RW" }, + { "bits": 7, "attr": "ResR0W0" }, + ] + + - If the ``enable`` bit is 0, the PHY is held in reset state. + - If the ``enable`` bit is 1, the PHY operates normally. + """ + enable: csr.Field(csr.action.RW, 1) + _unimp: csr.Field(csr.action.ResR0W0, 7) + + class PhyConfig(csr.Register, access="rw"): + """PHY configuration register. + + This :class:`Register` is writable if the ``enable`` field of :class:`TxPeripheral.Config` + is 0, and read-only otherwise. It has a single field with an implementation-specific shape + given by the ``phy_config_shape`` parameter. + + If ``phy_config_shape`` is ``unsigned(16)``, then the register has the following field: + + .. bitfield:: + :bits: 16 + + [ + { "name": "phy_config", "bits": 16, "attr": "RW" }, + ] + + Parameters + ---------- + phy_config_shape : :ref:`shape-like ` + Shape of the PHY configuration word. + phy_config_init : :class:`int` + Initial value of the PHY configuration word. + """ + def __init__(self, phy_config_shape, phy_config_init): + super().__init__(csr.Field(_PhyConfigFieldAction, phy_config_shape, + init=phy_config_init)) + + class Status(csr.Register, access="r"): + """Status register. + + This :class:`Register` is read to be informed of when the transmitter can accept data. + + It has the following fields: + + .. bitfield:: + :bits: 8 + + [ + { "name": "ready", "bits": 1, "attr": "R" }, + { "bits": 7, "attr": "ResR0W0" }, + ] + + - The ``ready`` field is set if the transmitter buffer is non-full. + """ + ready: csr.Field(csr.action.R, 1) + _unimp: csr.Field(csr.action.ResR0W0, 7) + + class Data(csr.Register, access="w"): + """Data register. + + This :class:`Register` is written to append data to the transmitter buffer. It has a single + field with an implementation-specific shape given by the ``symbol_shape`` parameter. + + If ``symbol_shape`` is ``unsigned(8)``, then the register has the following field: + + .. bitfield:: + :bits: 8 + + [ + { "name": "data", "bits": 8, "attr": "W" }, + ] + + - If either the ``enable`` field of :class:`TxPeripheral.Config` or the ``ready`` field of + :class:`TxPeripheral.Status` are 0, writing to this register has no side-effect. + - If both the ``enable`` field of :class:`TxPeripheral.Config` and the ``ready`` field of + :class:`TxPeripheral.Status` are 1, writing to this register appends one symbol to the + transmitter buffer. + + Parameters + ---------- + symbol_shape : :ref:`shape-like ` + Shape of a symbol. + """ + def __init__(self, symbol_shape): + super().__init__(csr.Field(csr.action.W, symbol_shape)) + + """UART transmitter peripheral. + + Parameters + ---------- + addr_width : :class:`int` + CSR bus address width. + data_width : :class:`int` + CSR bus data width. + phy_config_shape : :ref:`shape-like ` + Shape of the PHY configuration word. Optional. Defaults to ``unsigned(16)``. + phy_config_init : :class:`int` + Initial value of the PHY configuration word. Optional. Defaults to ``0``. + symbol_shape : :ref:`shape-like ` + Shape of a symbol. Optional. Defaults to ``unsigned(8)``. + + Members + ------- + bus : :py:`In(csr.Signature(addr_width, data_width))` + CSR bus interface providing access to registers. + phy : :py:`Out(TxPhySignature(phy_config_shape, symbol_shape))` + Interface between the peripheral and its PHY. + """ + def __init__(self, *, addr_width, data_width=8, phy_config_shape=unsigned(16), + phy_config_init=0, symbol_shape=unsigned(8)): + regs = csr.Builder(addr_width=addr_width, data_width=data_width) + + self._config = regs.add("Config", self.Config()) + self._phy_config = regs.add("PhyConfig", self.PhyConfig(phy_config_shape, phy_config_init)) + self._status = regs.add("Status", self.Status()) + self._data = regs.add("Data", self.Data(symbol_shape)) + + self._bridge = csr.Bridge(regs.as_memory_map()) + + super().__init__({ + "bus": In(csr.Signature(addr_width=addr_width, data_width=data_width)), + "phy": Out(TxPhySignature(phy_config_shape, symbol_shape)), + }) + self.bus.memory_map = self._bridge.bus.memory_map + + self._phy_config_shape = phy_config_shape + self._phy_config_init = phy_config_init + self._symbol_shape = symbol_shape + + @property + def phy_config_shape(self): + """Shape of the PHY configuration word. + + Returns + ------- + :ref:`shape-like ` + """ + return self._phy_config_shape + + @property + def phy_config_init(self): + """Initial value of the PHY configuration word. + + Returns + ------- + :class:`int` + """ + return self._phy_config_init + + @property + def symbol_shape(self): + """Shape of a symbol. + + Returns + ------- + :ref:`shape-like ` + """ + return self._symbol_shape + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + + connect(m, flipped(self.bus), self._bridge.bus) + + m.d.comb += [ + self.phy.rst.eq(~self._config.f.enable.data), + + self._phy_config.f.w_en.eq(self.phy.rst), + self.phy.config.eq(self._phy_config.f.data), + + self._status.f.ready.r_data.eq(self.phy.symbols.ready), + self.phy.symbols.payload.eq(self._data.f.w_data), + self.phy.symbols.valid.eq(self._data.f.w_stb), + ] + + return m + + +class Peripheral(wiring.Component): + """UART transceiver peripheral. + + This peripheral is composed of two subordinate peripherals. A :class:`RxPeripheral` occupies + the bottom half of the address space, and a :class:`TxPeripheral` occupies the top half. + Both subordinates have the same parameters as this peripheral, except ``addr_width`` (which + becomes ``addr_width - 1``). + + Parameters + ---------- + addr_width : :class:`int` + CSR bus address width. + data_width : :class:`int` + CSR bus data width. + phy_config_shape : :ref:`shape-like ` + Shape of the PHY configuration word. Optional. Defaults to ``unsigned(16)``. + phy_config_init : :class:`int` + Initial value of the PHY configuration word. Optional. Defaults to ``0``. + symbol_shape : :ref:`shape-like ` + Shape of a symbol. Optional. Defaults to ``unsigned(8)``. + + Members + ------- + bus : :py:`In(csr.Signature(addr_width, data_width))` + CSR bus interface providing access to registers. + rx : :py:`Out(RxPhySignature(phy_config_shape, symbol_shape))` + Interface between the peripheral and its PHY receiver. + tx : :py:`Out(TxPhySignature(phy_config_shape, symbol_shape))` + Interface between the peripheral and its PHY transmitter. + + Raises + ------ + :exc:`TypeError` + If ``addr_width`` is not a positive integer. + """ + def __init__(self, *, addr_width, data_width=8, phy_config_shape=unsigned(16), + phy_config_init=0, symbol_shape=unsigned(8)): + if not isinstance(addr_width, int) or addr_width <= 0: + raise TypeError(f"Address width must be a positive integer, not {addr_width!r}") + + self._rx = RxPeripheral(addr_width=addr_width - 1, data_width=data_width, + phy_config_shape=phy_config_shape, phy_config_init=phy_config_init, + symbol_shape=symbol_shape) + self._tx = TxPeripheral(addr_width=addr_width - 1, data_width=data_width, + phy_config_shape=phy_config_shape, phy_config_init=phy_config_init, + symbol_shape=symbol_shape) + + self._decoder = csr.Decoder(addr_width=addr_width, data_width=data_width) + self._decoder.add(self._rx.bus, name="rx") + self._decoder.add(self._tx.bus, name="tx") + + super().__init__({ + "bus": In(csr.Signature(addr_width=addr_width, data_width=data_width)), + "rx": Out(RxPhySignature(phy_config_shape, symbol_shape)), + "tx": Out(TxPhySignature(phy_config_shape, symbol_shape)), + }) + self.bus.memory_map = self._decoder.bus.memory_map + + self._phy_config_shape = phy_config_shape + self._phy_config_init = phy_config_init + self._symbol_shape = symbol_shape + + @property + def phy_config_shape(self): + """Shape of the PHY configuration word. + + Returns + ------- + :ref:`shape-like ` + """ + return self._phy_config_shape + + @property + def phy_config_init(self): + """Initial value of the PHY configuration word. + + Returns + ------- + :class:`int` + """ + return self._phy_config_init + + @property + def symbol_shape(self): + """Shape of a symbol. + + Returns + ------- + :ref:`shape-like ` + """ + return self._symbol_shape + + def elaborate(self, platform): + m = Module() + m.submodules.decoder = self._decoder + m.submodules.rx = self._rx + m.submodules.tx = self._tx + + connect(m, flipped(self.bus), self._decoder.bus) + connect(m, self._rx.phy, flipped(self.rx)) + connect(m, self._tx.phy, flipped(self.tx)) + + return m diff --git a/amaranth_orchard/io/uart.py b/amaranth_orchard/io/uart.py index 789d9a8..d799270 100644 --- a/amaranth_orchard/io/uart.py +++ b/amaranth_orchard/io/uart.py @@ -1,5 +1,5 @@ -from amaranth import Module, Signal, unsigned -from amaranth.lib import wiring +from amaranth import Module, Signal, unsigned, ResetInserter +from amaranth.lib import wiring, data, stream, io from amaranth.lib.wiring import In, Out, flipped, connect from amaranth_soc import csr @@ -7,103 +7,170 @@ from chipflow_lib.platforms import OutputPinSignature, InputPinSignature +from . import rfc_uart __all__ = ["UARTPeripheral"] - -class UARTPeripheral(wiring.Component): +class UARTPhyRx(wiring.Component): class Signature(wiring.Signature): def __init__(self): super().__init__({ - "tx": Out(OutputPinSignature(1)), - "rx": Out(InputPinSignature(1)), + "rst": Out(1), + "config": Out(data.StructLayout({"divisor": unsigned(24)})), + "symbols": In(stream.Signature(unsigned(8))), + "overflow": In(1), + "error": In(1), }) - def __init__(self, *, path=(), src_loc_at=0): - super().__init__(self.Signature(), path=path, src_loc_at=1 + src_loc_at) + def __init__(self, port, init_divisor): + super().__init__(self.Signature().flip()) + self._port = port + self._init_divisor = init_divisor - class TxData(csr.Register, access="w"): - """valid to write to when tx_rdy is 1, will trigger a transmit""" - val: csr.Field(csr.action.W, unsigned(8)) + def elaborate(self, platform): + m = Module() - class RxData(csr.Register, access="r"): - """valid to read from when rx_avail is 1, last received byte""" - val: csr.Field(csr.action.R, unsigned(8)) + lower = AsyncSerialRX(divisor=self._init_divisor, divisor_bits=24) + lower = ResetInserter(self.rst)(lower) + m.submodules.lower = lower - class TxReady(csr.Register, access="r"): - """is '1' when 1-byte transmit buffer is empty""" - val: csr.Field(csr.action.R, unsigned(1)) + m.d.sync += self.overflow.eq(0) + with m.If(lower.rdy): + with m.If(self.symbols.valid): + m.d.sync += self.overflow.eq(1) + with m.Else(): + m.d.sync += [ + self.symbols.payload.eq(lower.data), + self.symbols.valid.eq(1), + ] - class RxAvail(csr.Register, access="r"): - """is '1' when 1-byte receive buffer is full; reset by a read from rx_data""" - val: csr.Field(csr.action.R, unsigned(1)) + with m.If(self.symbols.ready): + m.d.sync += self.symbols.valid.eq(0) - class Divisor(csr.Register, access="rw"): - """baud divider, defaults to init_divisor""" - def __init__(self, init_divisor): - super().__init__({ - "val": csr.Field(csr.action.RW, unsigned(24), init=init_divisor), - }) + m.d.comb += [ + lower.i.eq(self._port.i), - """ - A custom, minimal UART. + lower.divisor.eq(self.config.divisor), + lower.ack.eq(1), - TODO: Interrupts support, perhaps mimic something with upstream Linux kernel support... - """ - def __init__(self, *, init_divisor): - self.init_divisor = init_divisor + self.error.eq(lower.err.frame), + ] - regs = csr.Builder(addr_width=5, data_width=8) + return m - self._tx_data = regs.add("tx_data", self.TxData(), offset=0x00) - self._rx_data = regs.add("rx_data", self.RxData(), offset=0x04) - self._tx_rdy = regs.add("tx_rdy", self.TxReady(), offset=0x08) - self._rx_avail = regs.add("rx_avail", self.RxAvail(), offset=0x0c) - self._divisor = regs.add("divisor", self.Divisor(init_divisor), offset=0x10) - self._bridge = csr.Bridge(regs.as_memory_map()) +class UARTPhyTx(wiring.Component): + class Signature(wiring.Signature): + def __init__(self): + super().__init__({ + "rst": Out(1), + "config": Out(data.StructLayout({"divisor": unsigned(24)})), + "symbols": Out(stream.Signature(unsigned(8))) + }) - super().__init__({ - "bus": In(csr.Signature(addr_width=regs.addr_width, data_width=regs.data_width)), - "pins": Out(self.Signature()), - }) - self.bus.memory_map = self._bridge.bus.memory_map + def __init__(self, port, init_divisor): + super().__init__(self.Signature().flip()) + self._port = port + self._init_divisor = init_divisor def elaborate(self, platform): m = Module() - m.submodules.bridge = self._bridge - connect(m, flipped(self.bus), self._bridge.bus) + lower = AsyncSerialTX(divisor=self._init_divisor, divisor_bits=24) + lower = ResetInserter(self.rst)(lower) + m.submodules.lower = lower - m.submodules.tx = tx = AsyncSerialTX(divisor=self.init_divisor, divisor_bits=24) m.d.comb += [ - self.pins.tx.o.eq(tx.o), - tx.data.eq(self._tx_data.f.val.w_data), - tx.ack.eq(self._tx_data.f.val.w_stb), - self._tx_rdy.f.val.r_data.eq(tx.rdy), - tx.divisor.eq(self._divisor.f.val.data) + self._port.o.eq(lower.o), + + lower.divisor.eq(self.config.divisor), + + lower.data.eq(self.symbols.payload), + lower.ack.eq(self.symbols.valid), + self.symbols.ready.eq(lower.rdy), ] - rx_buf = Signal(unsigned(8)) - rx_avail = Signal() + return m - m.submodules.rx = rx = AsyncSerialRX(divisor=self.init_divisor, divisor_bits=24) - with m.If(self._rx_data.f.val.r_stb): - m.d.sync += rx_avail.eq(0) +class UARTPhy(wiring.Component): + class Signature(wiring.Signature): + def __init__(self): + super().__init__({ + "rx": Out(UARTPhyRx.Signature()), + "tx": Out(UARTPhyTx.Signature()), + }) - with m.If(rx.rdy): - m.d.sync += [ - rx_buf.eq(rx.data), - rx_avail.eq(1) - ] + def __init__(self, ports, init_divisor): + super().__init__(self.Signature().flip()) + self._rx = UARTPhyRx(ports.rx, init_divisor) + self._tx = UARTPhyTx(ports.tx, init_divisor) - m.d.comb += [ - rx.i.eq(self.pins.rx.i), - rx.ack.eq(~rx_avail), - rx.divisor.eq(self._divisor.f.val.data), - self._rx_data.f.val.r_data.eq(rx_buf), - self._rx_avail.f.val.r_data.eq(rx_avail) - ] + def elaborate(self, platform): + m = Module() + + m.submodules.rx = self._rx + m.submodules.tx = self._tx + + connect(m, self._rx, flipped(self.rx)) + connect(m, self._tx, flipped(self.tx)) + + return m + + +class UARTPeripheral(wiring.Component): + + class Signature(wiring.Signature): + def __init__(self): + super().__init__({ + "tx": Out(OutputPinSignature(1)), + "rx": Out(InputPinSignature(1)), + }) + + + """Wrapper for amaranth_soc RFC UART with PHY and chipflow_lib.PinSignature support + + Parameters + ---------- + addr_width : :class:`int` + CSR bus address width. Defaults to ``5``. + data_width : :class:`int` + CSR bus data width. Defaults to ``8``. + init_divisor : :class:`int` + Initial divisor value + + Attributes + ---------- + bus : :class:`csr.Interface` + CSR bus interface providing access to registers. + pins : :class:`list` of :class:`wiring.PureInterface` of :class:`PinSignature` + UART pin interfaces. + + """ + + def __init__(self, *, addr_width=5, data_width=8, init_divisor=0): + phy_config_shape = data.StructLayout({"divisor": unsigned(24)}) + self._uart = rfc_uart.Peripheral( + addr_width=addr_width, + data_width=data_width, + phy_config_shape=phy_config_shape, + phy_config_init=phy_config_shape.const({"divisor": init_divisor}), + ) + + super().__init__({ + "bus": In(csr.Signature(addr_width=addr_width, data_width=data_width)), + "pins": Out(self.Signature()), + }) + self.bus.memory_map = self._uart.bus.memory_map + self._phy = UARTPhy(ports=self.pins, init_divisor=init_divisor) + + def elaborate(self, platform): + m = Module() + m.submodules._uart = uart = self._uart + m.submodules._phy = phy = self._phy + + connect(m, flipped(self.bus), self._uart.bus) + connect(m, uart.tx, phy.tx) + connect(m, uart.rx, phy.rx) return m diff --git a/pyproject.toml b/pyproject.toml index 240704a..2504984 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ dependencies = [ "chipflow-lib @ git+https://github.com/ChipFlow/chipflow-lib.git", "amaranth-soc @ git+https://github.com/amaranth-lang/amaranth-soc", "amaranth-stdio @ git+https://github.com/amaranth-lang/amaranth-stdio", + "pytest>=7.2.0", ] # Build system configuration diff --git a/tests/test_uart.py b/tests/test_uart.py new file mode 100644 index 0000000..7c56bea --- /dev/null +++ b/tests/test_uart.py @@ -0,0 +1,125 @@ +# amaranth: UnusedElaboratable=no + +import unittest +from amaranth import * +from amaranth.sim import * + +from amaranth_orchard.io import UARTPeripheral + +class PeripheralTestCase(unittest.TestCase): + + async def _csr_access(self, ctx, dut, addr, r_stb=0, w_stb=0, w_data=0, r_data=0): + ctx.set(dut.bus.addr, addr) + ctx.set(dut.bus.r_stb, r_stb) + ctx.set(dut.bus.w_stb, w_stb) + ctx.set(dut.bus.w_data, w_data) + await ctx.tick() + if r_stb: + self.assertEqual(ctx.get(dut.bus.r_data), r_data) + ctx.set(dut.bus.r_stb, 0) + ctx.set(dut.bus.w_stb, 0) + + def test_smoke_test(self): + """ + Smoke test GPIO. We assume that amaranth-soc GPIO tests are fully testing functionality. + """ + rx_addr = 0x00 + tx_addr = 0x10 + config_addr = 0x00 + phy_config_addr = 0x04 + status_addr = 0x08 + data_addr = 0x09 + + sim_divisor = 8 + + dut = UARTPeripheral(init_divisor=sim_divisor, addr_width=5) + + # simulated UART receiver + last_sent_byte = None + async def uart_rx_proc(ctx): + nonlocal last_sent_byte + counter = 0 + tx_last = 0 + sr = 0 + async for clk_edge, rst, tx in ctx.tick().sample(dut.pins.tx.o): + if rst: + pass + elif clk_edge: + if counter == 0: # waiting for start transition + if tx_last and not tx: # falling edge + counter = 1 + else: + counter += 1 + if (counter > (sim_divisor // 2)) and ((counter - (sim_divisor // 2)) % sim_divisor) == 0: # in middle of bit + bit = ((counter - (sim_divisor // 2)) // sim_divisor) + if bit >= 1 and bit <= 8: + sr = (tx << 7) | (sr >> 1) + if bit == 8: + last_sent_byte = sr + elif bit == 9: # stop + counter = 0 + tx_last = tx + + to_send_byte = None + async def uart_tx_proc(ctx): + nonlocal to_send_byte + counter = 0 + ctx.set(dut.pins.rx.i, 1) + async for clk_edge, rst in ctx.tick().sample(): + if rst: + pass + elif clk_edge: + if to_send_byte is not None: + bit = counter // sim_divisor + if bit == 0: + ctx.set(dut.pins.rx.i, 0) # start + elif bit >= 1 and bit <= 8: + ctx.set(dut.pins.rx.i, (to_send_byte >> (bit - 1)) & 0x1) + if bit > 8: + ctx.set(dut.pins.rx.i, 1) # stop + to_send_byte = None + counter = 0 + else: + counter += 1 + + async def testbench(ctx): + nonlocal to_send_byte + # enable tx + await self._csr_access(ctx, dut, tx_addr|config_addr, w_stb=1, w_data=1) + # check tx ready + await self._csr_access(ctx, dut, tx_addr|status_addr, r_stb=1, r_data=1) + # write byte + await self._csr_access(ctx, dut, tx_addr|data_addr, w_stb=1, w_data=0xA5) + await ctx.tick() + # check tx not ready + await self._csr_access(ctx, dut, tx_addr|status_addr, r_stb=1, r_data=0) + # wait for UART to do its thing + for i in range(sim_divisor * 12): + await ctx.tick() + # check tx ready + await self._csr_access(ctx, dut, tx_addr|status_addr, r_stb=1, r_data=1) + # check the byte was sent correctly + self.assertEqual(last_sent_byte, 0xA5) + + # enable rx + await self._csr_access(ctx, dut, rx_addr|config_addr, w_stb=1, w_data=1) + # check rx not ready + await self._csr_access(ctx, dut, rx_addr|status_addr, r_stb=1, r_data=0) + for i in range(sim_divisor): + await ctx.tick() + # send a byte to the UART + to_send_byte = 0x73 + for i in range(sim_divisor * 12): + await ctx.tick() + # check rx ready + await self._csr_access(ctx, dut, rx_addr|status_addr, r_stb=1, r_data=1) + # check the byte was received correctly + await self._csr_access(ctx, dut, rx_addr|data_addr, r_stb=1, r_data=0x73) + + sim = Simulator(dut) + sim.add_clock(1e-6) + sim.add_process(uart_rx_proc) + sim.add_process(uart_tx_proc) + sim.add_testbench(testbench) + with sim.write_vcd(vcd_file="test_uart.vcd"): + sim.run()