Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copy this file to `.env` and edit. Docker Compose loads `.env` automatically.

# PostgreSQL memory tuning. Defaults below suit a small host; size up to your
# host RAM — see the table in POSTGRES.md.
PG_SHARED_BUFFERS=256MB
PG_WORK_MEM=8MB
PG_MAINTENANCE_WORK_MEM=64MB
PG_EFFECTIVE_CACHE_SIZE=1GB
6 changes: 4 additions & 2 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ directly for integrations. Responses are JSON unless noted.
| GET | `/v1/nodes/{id}` | Single node. `id` may be hex (`abcd1234`, `!abcd1234`) or decimal. |
| GET | `/v1/nodes/{id}/telemetry` | Telemetry history. |
| GET | `/v1/nodes/{id}/texts` | Chat messages from this node. |
| GET | `/v1/nodes/{id}/packets` | Raw MQTT messages. Query param: `limit` (1–200, default 50). |
| GET | `/v1/nodes/{id}/packets` | Raw MQTT messages for this node, keyset-paginated. Query params: `limit` (1–200, default 50), `start`/`end` (unix-epoch seconds), `before` (cursor). Returns `{"packets": [...], "next_cursor": str \| null}`. |
| GET | `/v1/nodes/{id}/traceroutes` | Traceroutes involving this node. |

### Chat / Messages

| Method | Path | Notes |
|---|---|---|
| GET | `/v1/chat` | Chat in a channel. Query params: `channel` (default `"0"`), `range` (`1h`/`24h`/`7d`/`all`, default `24h`). |
| GET | `/v1/messages` | Raw MQTT messages. Query params: `q` (search), `range`, `limit` (1–50000, default 5000). |
| GET | `/v1/messages` | Raw MQTT messages, newest window only. Query params: `q` (search), `range`, `limit` (1–50000, default 5000). |
| GET | `/v1/mqtt_messages` | Same as `/v1/messages` without search. |
| GET | `/v1/packets` | Keyset-paginated packet archive — reaches the full history, not just the newest window. Query params: `q` (search), `topic` (substring filter on topic only), `range`, `start`/`end` (unix-epoch seconds, absolute window on ingest time), `before` (cursor from a prior page), `limit` (1–50000, default 1000). Returns `{"messages": [...], "next_cursor": str \| null}`; pass `next_cursor` back as `before` for the next page. Each message carries `mqtt_row_id` (stable DB id). |
| GET | `/v1/packets/{id}` | Single packet by `mqtt_row_id` — backs per-packet deeplinks. Returns `{"packet": {...}}`, or 404 if not found. |

### Telemetry / Traceroutes / Stats

Expand Down
93 changes: 93 additions & 0 deletions POSTGRES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,44 @@ The schema is automatically created on first run. Key tables:

## Performance Considerations

### Memory tuning

The `postgres` service in both compose files sets a few core memory
parameters via its `command:`. The defaults are deliberately conservative so
the stack runs on a small host, but they are already well above the stock
`postgres` image defaults (`shared_buffers` 128 MB, `work_mem` 4 MB).

Size them to your host: copy `.env.sample` to `.env` next to the compose file
(Docker Compose reads it automatically) and set any of:

| Variable | Default | What it does |
|----------------------------|---------|-----------------------------------------------|
| `PG_SHARED_BUFFERS` | `256MB` | Dedicated PG page cache (real RAM allocation). |
| `PG_WORK_MEM` | `8MB` | Per-sort/hash memory; multiplied by concurrency. |
| `PG_MAINTENANCE_WORK_MEM` | `64MB` | VACUUM / CREATE INDEX working memory. |
| `PG_EFFECTIVE_CACHE_SIZE` | `1GB` | Planner hint for OS+PG cache (not an allocation). |

Suggested starting points by **total host RAM** (assuming Postgres shares the
box with the rest of the MeshInfo stack — give it less than a dedicated DB
host would get):

| Host RAM | `PG_SHARED_BUFFERS` | `PG_WORK_MEM` | `PG_MAINTENANCE_WORK_MEM` | `PG_EFFECTIVE_CACHE_SIZE` |
|----------|---------------------|---------------|---------------------------|---------------------------|
| 4 GB | `512MB` | `16MB` | `128MB` | `2GB` |
| 8 GB | `1GB` | `32MB` | `256MB` | `4GB` |
| 16 GB | `2GB` | `64MB` | `512MB` | `10GB` |

Example `.env`:

```
PG_SHARED_BUFFERS=512MB
PG_WORK_MEM=16MB
PG_MAINTENANCE_WORK_MEM=128MB
PG_EFFECTIVE_CACHE_SIZE=2GB
```

Recreate the container to apply: `docker compose up -d postgres`.

### Real-time Writes

All writes happen in real-time as data arrives from MQTT, ensuring minimal data loss on crash.
Expand Down Expand Up @@ -171,6 +209,61 @@ Notes:
re-initialises empty, `docker compose up -d postgres`, then restore —
`gzip -dc backups/<dump>.sql.gz | docker compose exec -T postgres psql -U postgres -d meshinfo`.

## Partitioning the mqtt_messages archive

`mqtt_messages` is the raw packet firehose and dominates the database — it
typically holds 98%+ of all rows. Fresh installs since this change get a
**partitioned table** (RANGE partitioned by month on `created_at`), so the
table stays operationally manageable as it grows: each date-range query
prunes to the relevant month(s) instead of scanning everything, and
maintenance runs at partition scale, not table scale. The app
(`ensure_mqtt_partitions`) keeps next month's partition created ahead of
the rollover.

Databases created before partitioning landed need a **one-time conversion**:

```sh
bash scripts/migrate-mqtt-partitioning.sh
# dev stack: COMPOSE_FILE=docker-compose-dev.yml bash scripts/migrate-mqtt-partitioning.sh
```

What it does, inside a **single atomic transaction**:

1. Renames the existing table to `mqtt_messages_old`.
2. Creates a new month-partitioned `mqtt_messages` (with the `payload`
column `lz4`-compressed).
3. Creates one partition per month spanned by the data, copies every row,
and verifies the row count matches before committing.
4. Rebuilds indexes, re-homes the `id` sequence, reinstalls the trigger.

Any failure rolls the whole thing back — `mqtt_messages` is untouched.
The script stops `meshinfo` for the duration and restarts it on success.

Notes:

- **Requires lz4:** the partitioned `mqtt_messages` `payload` column uses
`COMPRESSION lz4`, so Postgres must be built with lz4 support (PG 14+ — the
official `postgres:18` image this stack ships with includes it). A self-hosted
Postgres compiled without lz4 will reject both the fresh-install schema and the
migration with `compression method lz4 not supported`.
- **Disk:** the migration needs ~3× the current `mqtt_messages` size free
on the data volume transiently (new copy + WAL + headroom). The pre-flight
check aborts with a clear message if there's not enough. Expand the data
volume first if you're short.
- **Rollback safety:** the original data is kept as `mqtt_messages_old`
until you drop it. Disk is not reclaimed until then. Once you've
confirmed the app + Logs page look right, run:
```sh
docker compose exec postgres psql -U postgres -d meshinfo \
-c 'DROP TABLE mqtt_messages_old;'
```
- **Idempotent**: re-running the script when the table is already
partitioned (or doesn't exist yet) is a no-op.
- **On Windows**, run from Git Bash, like `migrate-postgres.sh`.
- Partitioning does not shrink disk — it makes a huge table manageable.
Complete-history retention still implies the data volume must be
allowed to grow.

## Security

- Use strong passwords for PostgreSQL
Expand Down
63 changes: 59 additions & 4 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ def _parse_range(value: str | None) -> int | None:
return DEFAULT_RANGE
return DEFAULT_RANGE

@staticmethod
def _parse_epoch(value: str | None) -> datetime.datetime | None:
"""Parse a unix-epoch-seconds query param into an aware UTC datetime.
Returns None when absent or unparseable (treated as no bound)."""
if not value:
return None
try:
return datetime.datetime.fromtimestamp(int(value), tz=datetime.timezone.utc)
except (TypeError, ValueError, OSError, OverflowError):
return None

async def serve(self):
@app.get("/")
async def root():
Expand Down Expand Up @@ -165,8 +176,16 @@ async def node_packets(request: Request, id: str) -> JSONResponse:
limit = 50
limit = max(1, min(limit, 200))

packets = await self.data.pg_storage.query_node_mqtt_messages(node_id, limit=limit)
return jsonable_encoder({"packets": packets})
result = await self.data.pg_storage.query_node_mqtt_messages(
node_id,
limit=limit,
start=self._parse_epoch(request.query_params.get("start")),
end=self._parse_epoch(request.query_params.get("end")),
before=request.query_params.get("before"),
)
return jsonable_encoder(
{"packets": result["messages"], "next_cursor": result["next_cursor"]}
)

@app.get("/v1/nodes/{id}/traceroutes")
async def node_traceroutes(request: Request, id: str) -> JSONResponse:
Expand Down Expand Up @@ -216,7 +235,7 @@ async def messages(request: Request) -> JSONResponse:
results = await self.data.pg_storage.query_mqtt_messages(
limit=limit, search=search, range_seconds=range_seconds,
)
return jsonable_encoder(results)
return jsonable_encoder(results["messages"])

@app.get("/v1/mqtt_messages")
async def mqtt_messages(request: Request) -> JSONResponse:
Expand All @@ -229,7 +248,43 @@ async def mqtt_messages(request: Request) -> JSONResponse:
results = await self.data.pg_storage.query_mqtt_messages(
limit=limit, range_seconds=range_seconds,
)
return jsonable_encoder(results)
return jsonable_encoder(results["messages"])

@app.get("/v1/packets")
async def packets(request: Request) -> JSONResponse:
"""Keyset-paginated packet archive. Unlike /v1/mqtt_messages (which
returns only the newest window), this reaches the full history via
absolute start/end (unix-epoch seconds) and a `before` cursor.
Response: {"messages": [...], "next_cursor": str | null}."""
search = request.query_params.get("q")
range_seconds = self._parse_range(request.query_params.get("range"))
try:
limit = int(request.query_params.get("limit", 1000))
except (TypeError, ValueError):
return JSONResponse({"error": "limit must be an integer"}, status_code=400)
limit = max(1, min(limit, 50000))
result = await self.data.pg_storage.query_mqtt_messages(
limit=limit,
search=search,
topic=request.query_params.get("topic"),
range_seconds=range_seconds,
start=self._parse_epoch(request.query_params.get("start")),
end=self._parse_epoch(request.query_params.get("end")),
before=request.query_params.get("before"),
)
return jsonable_encoder(result)

@app.get("/v1/packets/{packet_id}")
async def packet_by_id(request: Request, packet_id: str) -> JSONResponse:
"""Single packet by `mqtt_messages` row id — backs per-packet deeplinks."""
try:
row_id = int(packet_id)
except (TypeError, ValueError):
return JSONResponse({"error": "packet id must be an integer"}, status_code=400)
packet = await self.data.pg_storage.query_mqtt_message_by_id(row_id)
if packet is None:
return JSONResponse({"error": "packet not found"}, status_code=404)
return jsonable_encoder({"packet": packet})

@app.get("/v1/stats")
async def stats(request: Request) -> JSONResponse:
Expand Down
6 changes: 2 additions & 4 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,8 @@
"enrich": {
"enabled": False,
"interval": 900,
# List of upstream MeshInfo-compatible name lookups. Each entry is either
# a named preset (currently: "bayme") or a URL template like
# "https://other.meshinfo.example/api/v1/nodes?ids={ids}".
"providers": ["bayme"],
# Meshview bulk dicts or MeshInfo URL templates; see config.toml.sample.
"providers": [],
},
"graph": {
"enabled": True,
Expand Down
17 changes: 9 additions & 8 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,18 @@ timezone = "America/Los_Angeles" # IANA timezone (e.g. "UTC", "Europe/Londo
[server.intervals]
data_save = 300 # seconds between graph rebuilds

# Node enrichment: ask other MeshInfo deployments for the names of nodes we
# heard via position/telemetry but never received a local NODEINFO for. Useful
# when a node's NODEINFO travelled over a different region/broker than the one
# this instance subscribes to.
# Node enrichment: ask other MeshInfo / Meshview deployments for the names of
# nodes we have positions/telemetry for but no local NODEINFO.
#
# Provider entry forms:
# { kind = "meshview", url = "...", name = "...", days_active = 7 } # Meshview bulk
# "https://other.meshinfo.example/api/v1/nodes?ids={ids}" # MeshInfo URL template
[server.enrich]
enabled = true
interval = 900 # seconds between enrichment runs (±10% jitter applied)
# Named presets currently shipped: "bayme" (https://data.bayme.sh)
# To add another MeshInfo instance, drop in a URL template with {ids} placeholder:
# providers = ["bayme", "https://other.meshinfo.example/api/v1/nodes?ids={ids}"]
providers = ["bayme"]
providers = [
{ kind = "meshview", url = "https://meshview.bayme.sh/api/nodes", name = "baymesh", days_active = 7 },
]

# Network graph generation
[server.graph]
Expand Down
Loading