@@ -68,28 +68,66 @@ def supports_cpp_node(self) -> bool:
68
68
"""Indicates whether this stage supports a C++ node."""
69
69
return True
70
70
71
- def _cast_control_message (self , message : ControlMessage , * , cpp_messages_lib : types .ModuleType ) -> ControlMessage :
71
+ def _store_payload (self , message : ControlMessage ) -> ControlMessage :
72
+ """
73
+ Store the MessageMeta in the ControlMessage's metadata.
74
+
75
+ In CPU-only allows the ControlMessage to hold an instance of a Python MessageMeta containing a pandas DataFrame.
76
+ """
77
+ message .set_metadata ("llm_message_meta" , message .payload ())
78
+ return message
79
+
80
+ def _cast_to_cpp_control_message (self , message : ControlMessage , * ,
81
+ cpp_messages_lib : types .ModuleType ) -> ControlMessage :
72
82
"""
73
83
LLMEngineStage does not contain a Python implementation, however it is capable of running in cpu-only mode.
74
- This method is needed to cast the Python ControlMessage to a C++ ControlMessage.
84
+ This method is needed to create an instance of a C++ ControlMessage.
75
85
76
86
This is different than casting from the Python bindings for the C++ ControlMessage to a C++ ControlMessage.
77
87
"""
78
- return cpp_messages_lib .ControlMessage (message , no_cast = True )
88
+ cm = cpp_messages_lib .ControlMessage ()
89
+ metadata = message .get_metadata ()
90
+ for (key , value ) in metadata .items ():
91
+ cm .set_metadata (key , value )
92
+
93
+ return cm
94
+
95
+ def _restore_payload (self , message : ControlMessage ) -> ControlMessage :
96
+ """
97
+ Pop llm_message_meta from the metadata and set it as the payload.
98
+
99
+ In CPU-only mode this has the effect of converting the C++ ControlMessage back to a Python ControlMessage.
100
+ """
101
+ metadata = message .get_metadata ()
102
+ message_meta = metadata .pop ("llm_message_meta" )
103
+
104
+ out_message = ControlMessage ()
105
+ out_message .payload (message_meta )
106
+ for (key , value ) in metadata .items ():
107
+ out_message .set_metadata (key , value )
108
+
109
+ return out_message
79
110
80
111
def _build_single (self , builder : mrc .Builder , input_node : mrc .SegmentObject ) -> mrc .SegmentObject :
81
112
import morpheus_llm ._lib .llm as _llm
113
+
114
+ store_payload_node = builder .make_node (f"{ self .unique_name } -store-payload" , ops .map (self ._store_payload ))
115
+ builder .make_edge (input_node , store_payload_node )
116
+
82
117
node = _llm .LLMEngineStage (builder , self .unique_name , self ._engine )
83
118
node .launch_options .pe_count = 1
84
119
85
120
if self ._config .execution_mode == ExecutionMode .CPU :
86
121
import morpheus ._lib .messages as _messages
87
- cast_fn = functools .partial (self ._cast_control_message , cpp_messages_lib = _messages )
88
- pre_node = builder .make_node (f"{ self .unique_name } -pre-cast" , ops .map (cast_fn ))
89
- builder .make_edge (input_node , pre_node )
122
+ cast_to_cpp_fn = functools .partial (self ._cast_to_cpp_control_message , cpp_messages_lib = _messages )
123
+ cast_to_cpp_node = builder .make_node (f"{ self .unique_name } -pre-msg-cast" , ops .map (cast_to_cpp_fn ))
124
+ builder .make_edge (store_payload_node , cast_to_cpp_node )
125
+ builder .make_edge (cast_to_cpp_node , node )
90
126
91
- input_node = pre_node
127
+ else :
128
+ builder .make_edge (store_payload_node , node )
92
129
93
- builder .make_edge (input_node , node )
130
+ restore_payload_node = builder .make_node (f"{ self .unique_name } -restore-payload" , ops .map (self ._restore_payload ))
131
+ builder .make_edge (node , restore_payload_node )
94
132
95
- return node
133
+ return restore_payload_node
0 commit comments