@@ -21,16 +21,21 @@ use std::env;
2121use std:: io:: { stdout, Write } ;
2222use std:: time:: Duration ;
2323
24- use bollard:: container:: { Config , CreateContainerOptions , InspectContainerOptions , KillContainerOptions , LogOutput , LogsOptions , RemoveContainerOptions , StartContainerOptions , StopContainerOptions , WaitContainerOptions } ;
2524use bollard:: Docker ;
26- use bollard:: errors :: Error ;
25+ use bollard:: container :: { Config , CreateContainerOptions , InspectContainerOptions , KillContainerOptions , LogOutput , LogsOptions , RemoveContainerOptions , StartContainerOptions , StopContainerOptions , WaitContainerOptions } ;
2726use bollard:: errors:: Error :: DockerResponseServerError ;
27+ use bollard:: errors:: Error ;
28+ use bollard:: exec:: { CreateExecOptions , CreateExecResults , StartExecResults } ;
2829use bollard:: image:: { CreateImageOptions , ListImagesOptions } ;
2930use bollard:: models:: { ContainerCreateResponse , ContainerWaitResponse , CreateImageInfo , HostConfig , NetworkCreateResponse , PortBinding } ;
3031use bollard:: network:: { CreateNetworkOptions , InspectNetworkOptions } ;
31- use futures:: { FutureExt , pin_mut, select, Stream , stream, stream_select} ;
3232use futures:: StreamExt ;
3333use futures:: TryStreamExt ;
34+ use futures:: stream:: { Fuse } ;
35+ use futures:: { FutureExt , pin_mut, select, Stream , stream, stream_select} ;
36+ use tokio:: task:: JoinHandle ;
37+ use speculoos:: assert_that;
38+ use speculoos:: prelude:: StrAssertions ;
3439
3540// https://stackoverflow.com/questions/27582739/how-do-i-create-a-hashmap-literal
3641macro_rules! collection {
@@ -50,6 +55,66 @@ const H2SPEC_IMAGE: &str = "summerwind/h2spec:2.6.0"; // do not prefix with `do
5055const NGHTTP2_IMAGE : & str = "nixery.dev/nghttp2:latest" ;
5156const NETCAT_IMAGE : & str = "nixery.dev/netcat:latest" ;
5257
58+ /**
59+ * Starts skrouterd and queries it with skmanage from inside the same container.
60+ *
61+ * Issue #925: ModuleNotFoundError: No module named 'proton'
62+ */
63+ #[ tokio:: test( flavor = "multi_thread" ) ]
64+ async fn test_skrouterd_sanity ( ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
65+ let skupper_router_image = & env:: var ( "QDROUTERD_IMAGE" ) . unwrap_or ( String :: from ( "quay.io/skupper/skupper-router:latest" ) ) ;
66+ println ! ( "Using router image: {}" , skupper_router_image) ;
67+
68+ let hostconfig: HostConfig = HostConfig {
69+ cap_add : Some ( vec ! [ "SYS_PTRACE" . to_string( ) ] ) ,
70+ port_bindings : Some ( collection ! {
71+ "5672/tcp" . to_string( ) => Some ( vec![ PortBinding { host_ip: None , host_port: None } ] ) ,
72+ } ) ,
73+ ..Default :: default ( )
74+ } ;
75+
76+ let docker = Docker :: connect_with_local_defaults ( ) . unwrap ( ) ;
77+
78+ // prefetch all images before creating containers
79+ find_or_pull_image ( & docker, skupper_router_image) . await ;
80+ find_or_pull_image ( & docker, NETCAT_IMAGE ) . await ;
81+
82+ let skrouterd = create_and_start_container (
83+ & docker, skupper_router_image, "skrouterd_sanity" ,
84+ Config {
85+ host_config : Some ( hostconfig. clone ( ) ) ,
86+ env : Some ( vec ! [
87+ format!( "QDROUTERD_DEBUG={}" , "asan" ) . as_str( ) ,
88+ ] ) ,
89+ ..Default :: default ( )
90+ } ) . await ;
91+
92+ let mut log_printer = LogPrinter :: new ( ) ;
93+ let logs_skrouterd = stream_container_logs ( & docker, "skrouterd" , & skrouterd) ;
94+
95+ log_printer. print ( logs_skrouterd. fuse ( ) ) ;
96+
97+ let gateway = get_container_gateway ( & docker, & skrouterd) . await ;
98+ let ipport = get_container_exposed_ports ( & docker, & skrouterd, "5672/tcp" ) . await ;
99+ wait_for_port_using_nc ( & docker, hostconfig, & * gateway, & * ipport. host_port . unwrap ( ) ) . await ?;
100+
101+ let skstat_command = vec ! [ "skstat" , "-l" ] ;
102+ let ( exec, result) = docker_exec ( & docker, & skrouterd, skstat_command) . await ;
103+ let ( stdout, stderr) = tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , docker_read_output ( result) ) . await . unwrap ( ) ;
104+ println ! ( "{}" , stdout) ;
105+ println ! ( "{}" , stderr) ;
106+
107+ assert_that ( & stdout) . contains ( "Router Links" ) ;
108+ assert_that ( & stdout) . contains ( "$management" ) ;
109+
110+ let exec_result = docker. inspect_exec ( & * exec. id ) . await ?;
111+ assert_eq ! ( exec_result. exit_code, Some ( 0 ) ) ;
112+
113+ docker. stop_container ( & * skrouterd. id , None ) . await . unwrap ( ) ;
114+
115+ return Ok ( ( ) ) ;
116+ }
117+
53118const ROUTER_CONFIG : & str = r#"
54119router {
55120 mode: standalone
@@ -163,11 +228,9 @@ async fn test_h2spec() {
163228 } ) . await ;
164229 let logs_skrouterd = stream_container_logs ( & docker, "skrouterd" , & container_skrouterd) ;
165230
166- let inspection_skrouterd = docker. inspect_container ( & * container_skrouterd. id , Some ( InspectContainerOptions { size : false } ) ) . await . unwrap ( ) ;
167- let network_settings_skrouterd = inspection_skrouterd. network_settings . unwrap ( ) ;
168- let hostname = network_settings_skrouterd. networks . unwrap ( ) . values ( ) . take ( 1 ) . next ( ) . unwrap ( ) . ip_address . as_ref ( ) . unwrap ( ) . clone ( ) ;
231+ let hostname = get_container_hostname ( & docker, & container_skrouterd) . await ;
169232
170- wait_for_port ( & docker, hostconfig. clone ( ) , & hostname, "24162" ) . await . expect ( "Port was not open in time" ) ;
233+ wait_for_port_using_nc ( & docker, hostconfig. clone ( ) , & hostname, "24162" ) . await . expect ( "Port was not open in time" ) ;
171234
172235 // container_h2spec
173236
@@ -225,8 +288,69 @@ async fn test_h2spec() {
225288 assert_eq ! ( 0 , router_exit_code. unwrap( ) ) ;
226289}
227290
291+ struct LogPrinter {
292+ tasks : Vec < JoinHandle < ( ) > > ,
293+ }
294+
295+ impl Drop for LogPrinter {
296+ fn drop ( & mut self ) {
297+ for task in self . tasks . drain ( ..) . rev ( ) {
298+ let result = futures:: executor:: block_on ( task) ;
299+ println ! ( "Finalized LogPrinter {:?}" , result) ;
300+ }
301+ }
302+ }
303+
304+ impl LogPrinter {
305+ fn new ( ) -> Self {
306+ Self {
307+ tasks : vec ! [ ] ,
308+ }
309+ }
310+
311+ fn print ( & mut self , log_stream : Fuse < ( impl Stream < Item =String > + Unpin + Sized + Send + ' static ) > ) {
312+ let mut s = log_stream;
313+ let handle = tokio:: spawn ( async move {
314+ while select ! {
315+ msg = s. next( ) => match msg {
316+ Some ( msg) => {
317+ let msg: String = msg;
318+ println!( "{}" , msg) ;
319+ true
320+ } ,
321+ None => false
322+ } ,
323+ } { }
324+ } ) ;
325+ self . tasks . push ( handle) ;
326+ }
327+ }
328+
329+ async fn get_container_gateway ( docker : & Docker , container : & ContainerCreateResponse ) -> String {
330+ let inspection_skrouterd = docker. inspect_container ( & * container. id , Some ( InspectContainerOptions { size : false } ) ) . await . unwrap ( ) ;
331+ let network_settings_skrouterd = inspection_skrouterd. network_settings . unwrap ( ) ;
332+ let gateway = network_settings_skrouterd. gateway . unwrap ( ) ;
333+ gateway
334+ }
335+
336+ async fn get_container_exposed_ports ( docker : & Docker , container : & ContainerCreateResponse , port : & str ) -> PortBinding {
337+ let inspection_skrouterd = docker. inspect_container ( & * container. id , Some ( InspectContainerOptions { size : false } ) ) . await . unwrap ( ) ;
338+ let network_settings_skrouterd = inspection_skrouterd. network_settings . unwrap ( ) ;
339+ let ports = network_settings_skrouterd. ports . unwrap ( ) ;
340+ let amqp = ports[ port] . clone ( ) . unwrap ( ) ;
341+ let ipport = amqp. first ( ) . unwrap ( ) ;
342+ return ipport. clone ( )
343+ }
344+
345+ async fn get_container_hostname ( docker : & Docker , container : & ContainerCreateResponse ) -> String {
346+ let inspection_skrouterd = docker. inspect_container ( & * container. id , Some ( InspectContainerOptions { size : false } ) ) . await . unwrap ( ) ;
347+ let network_settings_skrouterd = inspection_skrouterd. network_settings . unwrap ( ) ;
348+ let hostname = network_settings_skrouterd. networks . unwrap ( ) . values ( ) . take ( 1 ) . next ( ) . unwrap ( ) . ip_address . as_ref ( ) . unwrap ( ) . clone ( ) ;
349+ hostname
350+ }
351+
228352/// Runs a nc container which tries to connect to the provided hostname and port.
229- async fn wait_for_port ( docker : & Docker , hostconfig : HostConfig , hostname : & str , port : & str ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
353+ async fn wait_for_port_using_nc ( docker : & Docker , hostconfig : HostConfig , hostname : & str , port : & str ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
230354 let retry_policy = again:: RetryPolicy :: exponential ( Duration :: from_secs ( 1 ) )
231355 . with_jitter ( true )
232356 . with_max_delay ( Duration :: from_secs ( 3 ) )
@@ -382,3 +506,34 @@ async fn get_container_exit_code(docker: &Docker, container: &ContainerCreateRes
382506 } ;
383507 exit_code
384508}
509+
510+ async fn docker_read_output ( result : StartExecResults ) -> ( String , String ) {
511+ let mut stdout = String :: new ( ) ;
512+ let mut stderr = String :: new ( ) ;
513+ match result {
514+ StartExecResults :: Attached { mut output, .. } => {
515+ while let Some ( msg) = output. next ( ) . await {
516+ match msg. unwrap ( ) {
517+ LogOutput :: StdOut { message } => { stdout. push_str ( std:: str:: from_utf8 ( & * message) . unwrap ( ) ) }
518+ LogOutput :: StdErr { message } => { stderr. push_str ( std:: str:: from_utf8 ( & * message) . unwrap ( ) ) }
519+ _ => { }
520+ }
521+ }
522+ }
523+ StartExecResults :: Detached => {
524+ println ! ( "detached" ) ;
525+ }
526+ }
527+ ( stdout, stderr)
528+ }
529+
530+ async fn docker_exec ( docker : & Docker , container : & ContainerCreateResponse , command : Vec < & str > ) -> ( CreateExecResults , StartExecResults ) {
531+ let exec = docker. create_exec ( & * container. id , CreateExecOptions {
532+ cmd : Some ( command) ,
533+ attach_stdout : Some ( true ) ,
534+ attach_stderr : Some ( true ) ,
535+ ..Default :: default ( )
536+ } ) . await . unwrap ( ) ;
537+ let result = docker. start_exec ( & * exec. id , None ) . await . unwrap ( ) ;
538+ ( exec, result)
539+ }
0 commit comments