238238 ComputerContext ,
239239 KernelResourceSpec ,
240240 Mount ,
241+ ResourcePartitioner ,
241242 align_memory ,
242243 allocate ,
243244 known_slot_types ,
@@ -765,7 +766,10 @@ class AbstractAgent(
765766 etcd : AsyncEtcd
766767 local_instance_id : str
767768 kernel_registry : MutableMapping [KernelId , AbstractKernel ]
769+ resource_partitioner : ResourcePartitioner
768770 computers : MutableMapping [DeviceName , ComputerContext ]
771+ total_slots : Mapping [SlotName , Decimal ]
772+ reserved_slots : Mapping [SlotName , Decimal ]
769773 images : Mapping [ImageCanonical , ScannedImage ]
770774 port_pool : set [int ]
771775
@@ -836,6 +840,7 @@ def __init__(
836840 error_monitor : ErrorPluginContext ,
837841 skip_initial_scan : bool = False ,
838842 agent_public_key : Optional [PublicKey ],
843+ resource_partitioner : ResourcePartitioner ,
839844 ) -> None :
840845 self ._skip_initial_scan = skip_initial_scan
841846 self .loop = current_loop ()
@@ -845,7 +850,10 @@ def __init__(
845850 self .local_instance_id = generate_local_instance_id (__file__ )
846851 self .agent_public_key = agent_public_key
847852 self .kernel_registry = {}
853+ self .resource_partitioner = resource_partitioner
848854 self .computers = {}
855+ self .total_slots = {}
856+ self .reserved_slots = {}
849857 self .images = {}
850858 self .restarting_kernels = {}
851859 self .stat_ctx = StatContext (
@@ -941,6 +949,12 @@ async def __ainit__(self) -> None:
941949 self .computers [name ] = ComputerContext (computer , devices , alloc_map )
942950 metadatas .append (computer .get_metadata ())
943951
952+ self .total_slots = self .resource_partitioner .calculate_total_slots (
953+ self .computers , self .local_config .resource_common
954+ )
955+ self .reserved_slots = self .resource_partitioner .restrict_computer_resources (
956+ self .computers , self .total_slots
957+ )
944958 self .slots = await self .update_slots ()
945959 log .info ("Resource slots: {!r}" , self .slots )
946960 log .info ("Slot types: {!r}" , known_slot_types )
@@ -1947,6 +1961,7 @@ async def load_resources(
19471961 """
19481962 Detect available resources attached on the system and load corresponding device plugin.
19491963 """
1964+ raise NotImplementedError
19501965
19511966 @abstractmethod
19521967 async def scan_available_resources (
@@ -1955,6 +1970,7 @@ async def scan_available_resources(
19551970 """
19561971 Scan and define the amount of available resource slots in this node.
19571972 """
1973+ raise NotImplementedError
19581974
19591975 async def update_slots (
19601976 self ,
@@ -1965,14 +1981,9 @@ async def update_slots(
19651981 """
19661982 scanned_slots = await self .scan_available_resources ()
19671983 usable_slots : dict [SlotName , Decimal ] = {}
1968- reserved_slots = {
1969- SlotName ("cpu" ): Decimal (self .local_config .resource .reserved_cpu ),
1970- SlotName ("mem" ): Decimal (self .local_config .resource .reserved_mem ),
1971- SlotName ("disk" ): Decimal (self .local_config .resource .reserved_disk ),
1972- }
19731984 for slot_name , slot_capacity in scanned_slots .items ():
19741985 if slot_name == SlotName ("mem" ):
1975- mem_reserved = int (reserved_slots .get (slot_name , 0 ))
1986+ mem_reserved = int (self . reserved_slots .get (slot_name , 0 ))
19761987 mem_align = int (self .local_config .resource .memory_align_size )
19771988 mem_usable , mem_reserved = align_memory (
19781989 int (slot_capacity ), mem_reserved , align = mem_align
@@ -1986,7 +1997,7 @@ async def update_slots(
19861997 )
19871998 else :
19881999 usable_capacity = max (
1989- Decimal (0 ), slot_capacity - reserved_slots .get (slot_name , Decimal (0 ))
2000+ Decimal (0 ), slot_capacity - self . reserved_slots .get (slot_name , Decimal (0 ))
19902001 )
19912002 usable_slots [slot_name ] = usable_capacity
19922003 return usable_slots
@@ -2098,6 +2109,7 @@ async def scan_images(self) -> ScanImagesResult:
20982109 This is called periodically to keep the image list up-to-date and allow
20992110 manual image addition and deletions by admins.
21002111 """
2112+ raise NotImplementedError
21012113
21022114 async def _scan_images_wrapper (self , interval : float ) -> None :
21032115 result = await self .scan_images ()
@@ -2118,6 +2130,7 @@ async def push_image(
21182130 """
21192131 Push the given image to the given registry.
21202132 """
2133+ raise NotImplementedError
21212134
21222135 @abstractmethod
21232136 async def pull_image (
@@ -2130,12 +2143,14 @@ async def pull_image(
21302143 """
21312144 Pull the given image from the given registry.
21322145 """
2146+ raise NotImplementedError
21332147
21342148 @abstractmethod
21352149 async def purge_images (self , request : PurgeImagesReq ) -> PurgeImagesResp :
21362150 """
21372151 Purge the given images from the agent.
21382152 """
2153+ raise NotImplementedError
21392154
21402155 async def check_and_pull (
21412156 self ,
@@ -2267,7 +2282,7 @@ async def check_image(
22672282 Check the availability of the image and return a boolean flag that indicates whether
22682283 the agent should try pulling the image from a registry.
22692284 """
2270- return False
2285+ raise NotImplementedError
22712286
22722287 async def scan_running_kernels (self ) -> None :
22732288 """
@@ -3489,6 +3504,7 @@ async def destroy_kernel(
34893504 * Send SIGTERM to the kernel's main process.
34903505 * Send SIGKILL if it's not terminated within a few seconds.
34913506 """
3507+ raise NotImplementedError
34923508
34933509 @abstractmethod
34943510 async def clean_kernel (
@@ -3512,6 +3528,7 @@ async def clean_kernel(
35123528 The ``container_id`` may be ``None`` if the container has already gone away.
35133529 In such cases, skip container-specific cleanups.
35143530 """
3531+ raise NotImplementedError
35153532
35163533 @abstractmethod
35173534 async def create_local_network (self , network_name : str ) -> None :
@@ -3523,6 +3540,7 @@ async def create_local_network(self, network_name: str) -> None:
35233540 It may raise :exc:`NotImplementedError` and then the manager
35243541 will cancel creation of the session.
35253542 """
3543+ raise NotImplementedError
35263544
35273545 @abstractmethod
35283546 async def destroy_local_network (self , network_name : str ) -> None :
@@ -3531,6 +3549,7 @@ async def destroy_local_network(self, network_name: str) -> None:
35313549
35323550 This is called by the manager after kernel destruction.
35333551 """
3552+ raise NotImplementedError
35343553
35353554 @abstractmethod
35363555 async def restart_kernel__load_config (
@@ -3541,7 +3560,7 @@ async def restart_kernel__load_config(
35413560 """
35423561 Restore the cluster config from a previous launch of the kernel.
35433562 """
3544- pass
3563+ raise NotImplementedError
35453564
35463565 @abstractmethod
35473566 async def restart_kernel__store_config (
@@ -3554,7 +3573,7 @@ async def restart_kernel__store_config(
35543573 Store the cluster config to a kernel-related storage (e.g., scratch space),
35553574 so that restarts of this kernel can reuse the configuration.
35563575 """
3557- pass
3576+ raise NotImplementedError
35583577
35593578 async def restart_kernel (
35603579 self ,
0 commit comments