|
| 1 | +# piker: trading gear for hackers |
| 2 | +# Copyright (C) Tyler Goodlet (in stewardship for pikers) |
| 3 | + |
| 4 | +# This program is free software: you can redistribute it and/or modify |
| 5 | +# it under the terms of the GNU Affero General Public License as published by |
| 6 | +# the Free Software Foundation, either version 3 of the License, or |
| 7 | +# (at your option) any later version. |
| 8 | + |
| 9 | +# This program is distributed in the hope that it will be useful, |
| 10 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | +# GNU Affero General Public License for more details. |
| 13 | + |
| 14 | +# You should have received a copy of the GNU Affero General Public License |
| 15 | +# along with this program. If not, see <https://www.gnu.org/licenses/>. |
| 16 | + |
| 17 | +''' |
| 18 | +Broker-daemon-actor "endpoint-hooks": the service task entry points for |
| 19 | +``brokerd``. |
| 20 | +
|
| 21 | +''' |
| 22 | +from contextlib import ( |
| 23 | + asynccontextmanager as acm, |
| 24 | +) |
| 25 | + |
| 26 | +import tractor |
| 27 | +import trio |
| 28 | + |
| 29 | +from . import _util |
| 30 | +from . import get_brokermod |
| 31 | + |
| 32 | +# `brokerd` enabled modules |
| 33 | +# TODO: move this def to the `.data` subpkg.. |
| 34 | +# NOTE: keeping this list as small as possible is part of our caps-sec |
| 35 | +# model and should be treated with utmost care! |
| 36 | +_data_mods = [ |
| 37 | + 'piker.brokers.core', |
| 38 | + 'piker.brokers.data', |
| 39 | + 'piker.brokers._daemon', |
| 40 | + 'piker.data', |
| 41 | + 'piker.data.feed', |
| 42 | + 'piker.data._sampling' |
| 43 | +] |
| 44 | + |
| 45 | + |
| 46 | +# TODO: we should rename the daemon to datad prolly once we split up |
| 47 | +# broker vs. data tasks into separate actors? |
| 48 | +@tractor.context |
| 49 | +async def _setup_persistent_brokerd( |
| 50 | + ctx: tractor.Context, |
| 51 | + brokername: str, |
| 52 | + loglevel: str | None = None, |
| 53 | + |
| 54 | +) -> None: |
| 55 | + ''' |
| 56 | + Allocate a actor-wide service nursery in ``brokerd`` |
| 57 | + such that feeds can be run in the background persistently by |
| 58 | + the broker backend as needed. |
| 59 | +
|
| 60 | + ''' |
| 61 | + log = _util.get_console_log( |
| 62 | + loglevel or tractor.current_actor().loglevel, |
| 63 | + name=f'{_util.subsys}.{brokername}', |
| 64 | + ) |
| 65 | + # set global for this actor to this new process-wide instance B) |
| 66 | + _util.log = log |
| 67 | + |
| 68 | + from piker.data.feed import ( |
| 69 | + _bus, |
| 70 | + get_feed_bus, |
| 71 | + ) |
| 72 | + global _bus |
| 73 | + assert not _bus |
| 74 | + |
| 75 | + async with trio.open_nursery() as service_nursery: |
| 76 | + # assign a nursery to the feeds bus for spawning |
| 77 | + # background tasks from clients |
| 78 | + get_feed_bus(brokername, service_nursery) |
| 79 | + |
| 80 | + # unblock caller |
| 81 | + await ctx.started() |
| 82 | + |
| 83 | + # we pin this task to keep the feeds manager active until the |
| 84 | + # parent actor decides to tear it down |
| 85 | + await trio.sleep_forever() |
| 86 | + |
| 87 | + |
| 88 | +async def spawn_brokerd( |
| 89 | + |
| 90 | + brokername: str, |
| 91 | + loglevel: str | None = None, |
| 92 | + |
| 93 | + **tractor_kwargs, |
| 94 | + |
| 95 | +) -> bool: |
| 96 | + |
| 97 | + from piker.service import Services |
| 98 | + from piker.service._util import log # use service mngr log |
| 99 | + |
| 100 | + log.info(f'Spawning {brokername} broker daemon') |
| 101 | + |
| 102 | + brokermod = get_brokermod(brokername) |
| 103 | + dname = f'brokerd.{brokername}' |
| 104 | + |
| 105 | + extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) |
| 106 | + tractor_kwargs.update(extra_tractor_kwargs) |
| 107 | + |
| 108 | + # ask `pikerd` to spawn a new sub-actor and manage it under its |
| 109 | + # actor nursery |
| 110 | + modpath = brokermod.__name__ |
| 111 | + broker_enable = [modpath] |
| 112 | + for submodname in getattr( |
| 113 | + brokermod, |
| 114 | + '__enable_modules__', |
| 115 | + [], |
| 116 | + ): |
| 117 | + subpath = f'{modpath}.{submodname}' |
| 118 | + broker_enable.append(subpath) |
| 119 | + |
| 120 | + portal = await Services.actor_n.start_actor( |
| 121 | + dname, |
| 122 | + enable_modules=_data_mods + broker_enable, |
| 123 | + loglevel=loglevel, |
| 124 | + debug_mode=Services.debug_mode, |
| 125 | + **tractor_kwargs |
| 126 | + ) |
| 127 | + |
| 128 | + # non-blocking setup of brokerd service nursery |
| 129 | + await Services.start_service_task( |
| 130 | + dname, |
| 131 | + portal, |
| 132 | + |
| 133 | + # signature of target root-task endpoint |
| 134 | + _setup_persistent_brokerd, |
| 135 | + brokername=brokername, |
| 136 | + loglevel=loglevel, |
| 137 | + ) |
| 138 | + return True |
| 139 | + |
| 140 | + |
| 141 | +@acm |
| 142 | +async def maybe_spawn_brokerd( |
| 143 | + |
| 144 | + brokername: str, |
| 145 | + loglevel: str | None = None, |
| 146 | + |
| 147 | + **pikerd_kwargs, |
| 148 | + |
| 149 | +) -> tractor.Portal: |
| 150 | + ''' |
| 151 | + Helper to spawn a brokerd service *from* a client |
| 152 | + who wishes to use the sub-actor-daemon. |
| 153 | +
|
| 154 | + ''' |
| 155 | + from piker.service import maybe_spawn_daemon |
| 156 | + |
| 157 | + async with maybe_spawn_daemon( |
| 158 | + |
| 159 | + f'brokerd.{brokername}', |
| 160 | + service_task_target=spawn_brokerd, |
| 161 | + spawn_args={ |
| 162 | + 'brokername': brokername, |
| 163 | + }, |
| 164 | + loglevel=loglevel, |
| 165 | + |
| 166 | + **pikerd_kwargs, |
| 167 | + |
| 168 | + ) as portal: |
| 169 | + yield portal |
0 commit comments