Skip to content

Commit 89b0dd9

Browse files
author
Jean THOMAS
committed
cores.mech: Add Endless Potentiometer Decoder (WIP)
1 parent e7032ed commit 89b0dd9

File tree

2 files changed

+364
-0
lines changed

2 files changed

+364
-0
lines changed
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
# Endless potentiometer decoding into relative rotation
2+
# 2025 - LambdaConcept <[email protected]>
3+
from amaranth import *
4+
from ...interface import stream
5+
6+
__all__ = ["EndlessPotentiometerDecoder"]
7+
8+
9+
class _ThresholdDetector(Elaboratable):
10+
"""Detects when a value changes above/below a threshold"""
11+
def __init__(self, width, threshold):
12+
self._width = width
13+
self._threshold = threshold
14+
15+
self.readout = stream.Endpoint([
16+
("value", width),
17+
("previous_value", width),
18+
])
19+
20+
self.detection = stream.Endpoint([
21+
("up", 1),
22+
("down", 1),
23+
("value", width), # Readout value passthrough
24+
("delta", signed(width + 1)), # value - previous_value
25+
])
26+
27+
def elaborate(self, platform):
28+
m = Module()
29+
30+
low_threshold = Signal(signed(self._width + 1))
31+
high_threshold = Signal(signed(self._width + 1))
32+
m.d.comb += [
33+
low_threshold.eq(self.readout.previous_value - self._threshold),
34+
high_threshold.eq(self.readout.previous_value + self._threshold),
35+
]
36+
37+
with m.If(self.detection.ready | ~self.detection.valid):
38+
m.d.sync += [
39+
self.detection.valid.eq(self.readout.valid),
40+
self.detection.up.eq(self.readout.value > high_threshold),
41+
self.detection.down.eq(self.readout.value < low_threshold),
42+
self.detection.value.eq(self.readout.value),
43+
self.detection.delta.eq(self.readout.value - self.readout.previous_value),
44+
]
45+
m.d.comb += self.readout.ready.eq(self.detection.ready | ~self.detection.valid)
46+
47+
return m
48+
49+
50+
class _DirectionDecoding(Elaboratable):
51+
def __init__(self, width):
52+
self._width = width
53+
54+
self.dir_a = stream.Endpoint([
55+
("up", 1),
56+
("down", 1),
57+
("value", width),
58+
("delta", signed(width + 1)),
59+
])
60+
self.dir_b = stream.Endpoint([
61+
("up", 1),
62+
("down", 1),
63+
("value", width),
64+
("delta", signed(width + 1)),
65+
])
66+
67+
self.direction = stream.Endpoint([
68+
("clockwise", 1),
69+
("counterclockwise", 1),
70+
("value_a", width),
71+
("delta_a", signed(width + 1)),
72+
("value_b", width),
73+
("delta_b", signed(width + 1)),
74+
])
75+
76+
def elaborate(self, platform):
77+
m = Module()
78+
79+
m.d.comb += [
80+
self.dir_a.ready.eq(self.direction.ready & self.dir_b.valid),
81+
self.dir_b.ready.eq(self.direction.ready & self.dir_a.valid),
82+
]
83+
84+
m.d.sync += [
85+
self.direction.valid.eq(self.dir_a.valid & self.dir_b.valid),
86+
self.direction.value_a.eq(self.dir_a.value),
87+
self.direction.value_b.eq(self.dir_b.value),
88+
self.direction.delta_a.eq(self.dir_a.delta),
89+
self.direction.delta_b.eq(self.dir_b.delta),
90+
]
91+
92+
a_above_b = Signal()
93+
a_above_mid = Signal()
94+
b_above_mid = Signal()
95+
m.d.comb += [
96+
a_above_b.eq(self.dir_a.value > self.dir_b.value),
97+
a_above_mid.eq(self.dir_a.value > (1 << self._width) // 2),
98+
b_above_mid.eq(self.dir_b.value > (1 << self._width) // 2),
99+
]
100+
101+
with m.If(self.dir_a.down & self.dir_b.down):
102+
with m.If(a_above_b):
103+
m.d.sync += self.direction.clockwise.eq(1)
104+
with m.Else():
105+
m.d.sync += self.direction.counterclockwise.eq(1)
106+
with m.Elif(self.dir_a.up & self.dir_b.up):
107+
with m.If(~a_above_b):
108+
m.d.sync += self.direction.clockwise.eq(1)
109+
with m.Else():
110+
m.d.sync += self.direction.counterclockwise.eq(1)
111+
with m.Elif(self.dir_a.up & self.dir_b.down):
112+
with m.If(a_above_mid | b_above_mid):
113+
m.d.sync += self.direction.clockwise.eq(1)
114+
with m.Else():
115+
m.d.sync += self.direction.counterclockwise.eq(1)
116+
with m.Elif(self.dir_a.down & self.dir_b.up):
117+
with m.If(~a_above_mid | ~b_above_mid):
118+
m.d.sync += self.direction.clockwise.eq(1)
119+
with m.Else():
120+
m.d.sync += self.direction.counterclockwise.eq(1)
121+
with m.Else():
122+
m.d.sync += [
123+
self.direction.clockwise.eq(0),
124+
self.direction.counterclockwise.eq(0),
125+
]
126+
127+
return m
128+
129+
130+
class _ReadoutDeadzoneMuxer(Elaboratable):
131+
def __init__(self, width, deadzone=0.8):
132+
self._width = width
133+
self._deadzone = deadzone
134+
135+
self.direction = stream.Endpoint([
136+
("clockwise", 1),
137+
("counterclockwise", 1),
138+
("value_a", width),
139+
("delta_a", signed(width + 1)),
140+
("value_b", width),
141+
("delta_b", signed(width + 1)),
142+
])
143+
144+
self.position = stream.Endpoint([
145+
("diff", signed(width + 1)),
146+
])
147+
148+
def elaborate(self, platform):
149+
m = Module()
150+
151+
deadzone_max = int((1 << self._width) * self._deadzone)
152+
deadzone_min = int((1 << self._width) * (1 - self._deadzone))
153+
154+
value = Signal(signed(self._width + 1))
155+
with m.If((self.direction.value_a < deadzone_max) & (self.direction.value_a > deadzone_min)):
156+
with m.If(self.direction.clockwise):
157+
m.d.comb += value.eq(abs(self.direction.delta_a))
158+
with m.Elif(self.direction.counterclockwise):
159+
m.d.comb += value.eq(-abs(self.direction.delta_a))
160+
with m.Else():
161+
m.d.comb += value.eq(0)
162+
with m.Else():
163+
with m.If(self.direction.clockwise):
164+
m.d.comb += value.eq(abs(self.direction.delta_b))
165+
with m.Elif(self.direction.counterclockwise):
166+
m.d.comb += value.eq(-abs(self.direction.delta_b))
167+
with m.Else():
168+
m.d.comb += value.eq(0)
169+
170+
with m.If(self.position.ready | ~self.position.valid):
171+
m.d.sync += [
172+
self.position.valid.eq(self.direction.valid),
173+
self.position.diff.eq(value),
174+
]
175+
m.d.comb += self.direction.ready.eq(self.position.ready | ~self.position.valid)
176+
177+
return m
178+
179+
180+
class EndlessPotentiometerDecoder(Elaboratable):
181+
def __init__(self, width, threshold, deadzone):
182+
self._width = width
183+
self._threshold = threshold
184+
self._deadzone = deadzone
185+
186+
self.ch_a = stream.Endpoint([
187+
("value", width),
188+
("previous_value", width),
189+
])
190+
self.ch_b = stream.Endpoint([
191+
("value", width),
192+
("previous_value", width),
193+
])
194+
195+
self.position = stream.Endpoint([
196+
("diff", signed(width + 1)),
197+
])
198+
199+
def elaborate(self, platform):
200+
m = Module()
201+
202+
m.submodules.thres_det_a = thres_det_a = _ThresholdDetector(self._width, self._threshold)
203+
m.submodules.thres_det_b = thres_det_b = _ThresholdDetector(self._width, self._threshold)
204+
m.submodules.dir_decoding = dir_decoding = _DirectionDecoding(self._width)
205+
m.submodules.deadzone_mux = deadzone_mux = _ReadoutDeadzoneMuxer(self._width, self._deadzone)
206+
m.d.comb += [
207+
self.ch_a.connect(thres_det_a.readout),
208+
self.ch_b.connect(thres_det_b.readout),
209+
210+
thres_det_a.detection.connect(dir_decoding.dir_a),
211+
thres_det_b.detection.connect(dir_decoding.dir_b),
212+
213+
dir_decoding.direction.connect(deadzone_mux.direction),
214+
deadzone_mux.position.connect(self.position),
215+
]
216+
217+
return m
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
from amaranth.sim import *
2+
from lambdalib.interface.stream_sim import *
3+
from lambdalib.cores.mech.endless_potentiometer import (
4+
EndlessPotentiometerDecoder,
5+
_ThresholdDetector,
6+
_DirectionDecoding,
7+
)
8+
9+
10+
def _wiper_to_adc(angular_position, phase_shift, adc_resolution):
11+
"""Emulate sampled wiper output
12+
13+
:param angular_position: Wiper absolute position in deg
14+
:param phase_shift: Phase shift in deg
15+
:param adc_resolution: ADC resolution in bits"""
16+
adc_max = (1 << adc_resolution) - 1
17+
18+
angular_position += phase_shift
19+
20+
# Make it 0-180-0
21+
periodic_angle = angular_position % 180
22+
if (angular_position // 180) & 1:
23+
periodic_angle = 180 - periodic_angle
24+
25+
return int(adc_max * (periodic_angle / 180))
26+
27+
28+
29+
def test_wiper_to_adc():
30+
# W/o phase quadrature
31+
assert _wiper_to_adc(0, 0, 10) == 0
32+
assert _wiper_to_adc(90, 0, 10) == 1023//2
33+
assert _wiper_to_adc(180, 0, 10) == 1023
34+
assert _wiper_to_adc(270, 0, 10) == 1023//2
35+
assert _wiper_to_adc(360, 0, 10) == 0
36+
37+
# W/ phase quadrature
38+
assert _wiper_to_adc(0, 90, 10) == 1023//2
39+
assert _wiper_to_adc(90, 90, 10) == 1023
40+
assert _wiper_to_adc(180, 90, 10) == 1023//2
41+
assert _wiper_to_adc(270, 90, 10) == 0
42+
assert _wiper_to_adc(360, 90, 10) == 1023//2
43+
44+
45+
def test_threshold_detector():
46+
dut = _ThresholdDetector(width=8, threshold=16)
47+
sim = Simulator(dut)
48+
49+
data = {
50+
"value": [
51+
0, 32, 0,
52+
],
53+
"previous_value": [
54+
0, 0, 32,
55+
]
56+
}
57+
58+
tx = StreamSimSender(dut.readout, data, speed=0.3)
59+
rx = StreamSimReceiver(dut.detection, length=len(data["value"]), speed=0.8, verbose=True)
60+
61+
sim.add_clock(1e-6)
62+
sim.add_sync_process(tx.sync_process)
63+
sim.add_sync_process(rx.sync_process)
64+
with sim.write_vcd("tests/test_threshold_detector.vcd"):
65+
sim.run()
66+
67+
rx.verify({
68+
"up": [0, 1, 0],
69+
"down": [0, 0, 1],
70+
"value": [0, 32, 0],
71+
"delta": [0, 32, -32],
72+
})
73+
74+
75+
def test_direction_decoding():
76+
dut = _DirectionDecoding(width=8)
77+
sim = Simulator(dut)
78+
79+
ch_a = {
80+
"up": [
81+
1,
82+
],
83+
"down": [
84+
0,
85+
],
86+
"value": [
87+
0,
88+
],
89+
"delta": [
90+
0,
91+
],
92+
}
93+
ch_b = {
94+
"up": [
95+
1,
96+
],
97+
"down": [
98+
0,
99+
],
100+
"value": [
101+
0,
102+
],
103+
"delta": [
104+
0,
105+
],
106+
}
107+
108+
tx_a = StreamSimSender(dut.dir_a, ch_a, speed=0.3)
109+
tx_b = StreamSimSender(dut.dir_b, ch_b, speed=0.3)
110+
rx = StreamSimReceiver(dut.direction, length=len(ch_a["value"]), speed=0.8, verbose=True)
111+
112+
sim.add_clock(1e-6)
113+
sim.add_sync_process(tx_a.sync_process)
114+
sim.add_sync_process(tx_b.sync_process)
115+
sim.add_sync_process(rx.sync_process)
116+
with sim.write_vcd("tests/test_direction_decoding.vcd"):
117+
sim.run()
118+
119+
120+
def test_endless_potentiometer_decoder():
121+
adc_resolution = 10 # bits
122+
123+
dut = EndlessPotentiometerDecoder(10, 5, 0.8)
124+
sim = Simulator(dut)
125+
126+
wiper_a = [_wiper_to_adc(x, 0, adc_resolution) for x in range(720)]
127+
wiper_b = [_wiper_to_adc(x, 90, adc_resolution) for x in range(720)]
128+
129+
ch_a = {
130+
"value": wiper_a[:-1],
131+
"previous_value": wiper_a[1:],
132+
}
133+
ch_b = {
134+
"value": wiper_b[:-1],
135+
"previous_value": wiper_b[1:],
136+
}
137+
138+
tx_a = StreamSimSender(dut.ch_a, ch_a, speed=0.3)
139+
tx_b = StreamSimSender(dut.ch_a, ch_b, speed=0.3)
140+
rx = StreamSimReceiver(dut.position, length=len(ch_a["value"]), speed=0.8, verbose=True)
141+
142+
sim.add_clock(1e-6)
143+
sim.add_sync_process(tx_a.sync_process)
144+
sim.add_sync_process(tx_b.sync_process)
145+
sim.add_sync_process(rx.sync_process)
146+
with sim.write_vcd("tests/test_endless_potentiometer_decoder.vcd"):
147+
sim.run()

0 commit comments

Comments
 (0)