32
32
// panic(err)
33
33
// }
34
34
//
35
- // ag.Listen (ctx)
35
+ // ag.ListenSSH (ctx)
36
36
// }
37
37
//
38
38
// [ShellHub Agent]: https://github.com/shellhub-io/shellhub/tree/master/agent
@@ -41,24 +41,15 @@ package agent
41
41
import (
42
42
"context"
43
43
"crypto/rsa"
44
- "io"
45
- "math/rand"
46
- "net"
47
- "net/http"
48
44
"net/url"
49
45
"os"
50
46
"runtime"
51
- "strings"
52
- "sync/atomic"
53
- "time"
54
47
55
48
"github.com/Masterminds/semver"
56
- "github.com/labstack/echo/v4"
57
49
"github.com/pkg/errors"
58
50
"github.com/shellhub-io/shellhub/pkg/agent/pkg/keygen"
59
51
"github.com/shellhub-io/shellhub/pkg/agent/pkg/sysinfo"
60
- "github.com/shellhub-io/shellhub/pkg/agent/pkg/tunnel"
61
- "github.com/shellhub-io/shellhub/pkg/agent/server"
52
+ "github.com/shellhub-io/shellhub/pkg/agent/ssh"
62
53
"github.com/shellhub-io/shellhub/pkg/api/client"
63
54
"github.com/shellhub-io/shellhub/pkg/envs"
64
55
"github.com/shellhub-io/shellhub/pkg/models"
@@ -169,12 +160,9 @@ type Agent struct {
169
160
Identity * models.DeviceIdentity
170
161
Info * models.DeviceInfo
171
162
authData * models.DeviceAuthResponse
172
- cli client.Client
173
163
serverInfo * models.Info
174
- server * server.Server
175
- tunnel * tunnel.Tunnel
176
- listening chan bool
177
- closed atomic.Bool
164
+ cli client.Client
165
+ ssh * ssh.SSH
178
166
mode Mode
179
167
}
180
168
@@ -264,8 +252,6 @@ func (a *Agent) Initialize() error {
264
252
return errors .Wrap (err , "failed to authorize device" )
265
253
}
266
254
267
- a .closed .Store (false )
268
-
269
255
return nil
270
256
}
271
257
@@ -356,263 +342,19 @@ func (a *Agent) authorize() error {
356
342
return err
357
343
}
358
344
359
- func (a * Agent ) isClosed () bool {
360
- return a .closed .Load ()
361
- }
362
-
363
345
// Close closes the ShellHub Agent's listening, stoping it from receive new connection requests.
364
346
func (a * Agent ) Close () error {
365
- a .closed .Store (true )
366
-
367
- return a .tunnel .Close ()
368
- }
369
-
370
- func connHandler (serv * server.Server ) func (c echo.Context ) error {
371
- return func (c echo.Context ) error {
372
- hj , ok := c .Response ().Writer .(http.Hijacker )
373
- if ! ok {
374
- return c .String (http .StatusInternalServerError , "webserver doesn't support hijacking" )
375
- }
376
-
377
- conn , _ , err := hj .Hijack ()
378
- if err != nil {
379
- return c .String (http .StatusInternalServerError , "failed to hijack connection" )
380
- }
381
-
382
- id := c .Param ("id" )
383
- httpConn := c .Request ().Context ().Value ("http-conn" ).(net.Conn )
384
- serv .Sessions .Store (id , httpConn )
385
- serv .HandleConn (httpConn )
386
-
387
- conn .Close ()
388
-
389
- return nil
390
- }
347
+ return a .ssh .Close ()
391
348
}
392
349
393
- func httpHandler () func (c echo.Context ) error {
394
- return func (c echo.Context ) error {
395
- replyError := func (err error , msg string , code int ) error {
396
- log .WithError (err ).WithFields (log.Fields {
397
- "remote" : c .Request ().RemoteAddr ,
398
- "namespace" : c .Request ().Header .Get ("X-Namespace" ),
399
- "path" : c .Request ().Header .Get ("X-Path" ),
400
- "version" : AgentVersion ,
401
- }).Error (msg )
402
-
403
- return c .String (code , msg )
404
- }
405
-
406
- in , err := net .Dial ("tcp" , ":80" )
407
- if err != nil {
408
- return replyError (err , "failed to connect to HTTP server on device" , http .StatusInternalServerError )
409
- }
410
-
411
- defer in .Close ()
412
-
413
- url , err := url .Parse (c .Request ().Header .Get ("X-Path" ))
414
- if err != nil {
415
- return replyError (err , "failed to parse URL" , http .StatusInternalServerError )
416
- }
417
-
418
- c .Request ().URL .Scheme = "http"
419
- c .Request ().URL = url
420
-
421
- if err := c .Request ().Write (in ); err != nil {
422
- return replyError (err , "failed to write request to the server on device" , http .StatusInternalServerError )
423
- }
424
-
425
- out , _ , err := c .Response ().Hijack ()
426
- if err != nil {
427
- return replyError (err , "failed to hijack connection" , http .StatusInternalServerError )
428
- }
429
-
430
- defer out .Close () // nolint:errcheck
350
+ // ListenSSH creates the SSH server and listening for connections.
351
+ func (a * Agent ) ListenSSH (ctx context.Context ) error {
352
+ a .ssh = ssh .NewSSH (a .cli , a .authData .Token )
431
353
432
- if _ , err := io .Copy (out , in ); err != nil {
433
- return replyError (err , "failed to copy response from device service to client" , http .StatusInternalServerError )
434
- }
435
-
436
- return nil
437
- }
438
- }
439
-
440
- func closeHandler (a * Agent , serv * server.Server ) func (c echo.Context ) error {
441
- return func (c echo.Context ) error {
442
- id := c .Param ("id" )
443
- serv .CloseSession (id )
444
-
445
- log .WithFields (
446
- log.Fields {
447
- "id" : id ,
448
- "version" : AgentVersion ,
449
- "tenant_id" : a .authData .Namespace ,
450
- "server_address" : a .config .ServerAddress ,
451
- },
452
- ).Info ("A tunnel connection was closed" )
453
-
454
- return nil
455
- }
456
- }
457
-
458
- // Listen creates the SSH server and listening for connections.
459
- func (a * Agent ) Listen (ctx context.Context ) error {
354
+ // TODO: Don't create the SSH server from this function, as it seems to be out its own context.
460
355
a .mode .Serve (a )
461
356
462
- a .tunnel = tunnel .NewBuilder ().
463
- WithConnHandler (connHandler (a .server )).
464
- WithCloseHandler (closeHandler (a , a .server )).
465
- WithHTTPHandler (httpHandler ()).
466
- Build ()
467
-
468
- go a .ping (ctx , AgentPingDefaultInterval ) //nolint:errcheck
469
-
470
- ctx , cancel := context .WithCancel (ctx )
471
- go func () {
472
- for {
473
- if a .isClosed () {
474
- log .WithFields (log.Fields {
475
- "version" : AgentVersion ,
476
- "tenant_id" : a .authData .Namespace ,
477
- "server_address" : a .config .ServerAddress ,
478
- }).Info ("Stopped listening for connections" )
479
-
480
- cancel ()
481
-
482
- return
483
- }
484
-
485
- namespace := a .authData .Namespace
486
- tenantName := a .authData .Name
487
- sshEndpoint := a .serverInfo .Endpoints .SSH
488
-
489
- sshid := strings .NewReplacer (
490
- "{namespace}" , namespace ,
491
- "{tenantName}" , tenantName ,
492
- "{sshEndpoint}" , strings .Split (sshEndpoint , ":" )[0 ],
493
- ).Replace ("{namespace}.{tenantName}@{sshEndpoint}" )
494
-
495
- listener , err := a .cli .NewReverseListener (ctx , a .authData .Token , "/ssh/connection" )
496
- if err != nil {
497
- log .WithError (err ).WithFields (log.Fields {
498
- "version" : AgentVersion ,
499
- "tenant_id" : a .authData .Namespace ,
500
- "server_address" : a .config .ServerAddress ,
501
- "ssh_server" : sshEndpoint ,
502
- "sshid" : sshid ,
503
- }).Error ("Failed to connect to server through reverse tunnel. Retry in 10 seconds" )
504
- time .Sleep (time .Second * 10 )
505
-
506
- continue
507
- }
508
-
509
- log .WithFields (log.Fields {
510
- "namespace" : namespace ,
511
- "hostname" : tenantName ,
512
- "server_address" : a .config .ServerAddress ,
513
- "ssh_server" : sshEndpoint ,
514
- "sshid" : sshid ,
515
- }).Info ("Server connection established" )
516
-
517
- a .listening <- true
518
-
519
- {
520
- // NOTE: Tunnel'll only realize that it lost its connection to the ShellHub SSH when the next
521
- // "keep-alive" connection fails. As a result, it will take this interval to reconnect to its server.
522
- err := a .tunnel .Listen (listener )
523
-
524
- log .WithError (err ).WithFields (log.Fields {
525
- "namespace" : namespace ,
526
- "hostname" : tenantName ,
527
- "server_address" : a .config .ServerAddress ,
528
- "ssh_server" : sshEndpoint ,
529
- "sshid" : sshid ,
530
- }).Info ("Tunnel listener closed" )
531
-
532
- listener .Close () // nolint:errcheck
533
- }
534
-
535
- a .listening <- false
536
- }
537
- }()
538
-
539
- <- ctx .Done ()
540
-
541
- return a .Close ()
542
- }
543
-
544
- // AgentPingDefaultInterval is the default time interval between ping on agent.
545
- const AgentPingDefaultInterval = 10 * time .Minute
546
-
547
- // ping sends an authorization request to the ShellHub server at each interval.
548
- // A random value between 10 and [config.MaxRetryConnectionTimeout] seconds is added to the interval
549
- // each time the ticker is executed.
550
- //
551
- // Ping only sends requests to the server if the agent is listening for connections. If the agent is not
552
- // listening, the ping process will be stopped. When the interval is 0, the default value is 10 minutes.
553
- func (a * Agent ) ping (ctx context.Context , interval time.Duration ) error {
554
- a .listening = make (chan bool )
555
-
556
- if interval == 0 {
557
- interval = AgentPingDefaultInterval
558
- }
559
-
560
- <- a .listening // NOTE: wait for the first connection to start to ping the server.
561
- ticker := time .NewTicker (interval )
562
-
563
- for {
564
- if a .isClosed () {
565
- return nil
566
- }
567
-
568
- select {
569
- case <- ctx .Done ():
570
- log .WithFields (log.Fields {
571
- "version" : AgentVersion ,
572
- "tenant_id" : a .authData .Namespace ,
573
- "server_address" : a .config .ServerAddress ,
574
- }).Debug ("stopped pinging server due to context cancellation" )
575
-
576
- return nil
577
- case ok := <- a .listening :
578
- if ok {
579
- log .WithFields (log.Fields {
580
- "version" : AgentVersion ,
581
- "tenant_id" : a .authData .Namespace ,
582
- "server_address" : a .config .ServerAddress ,
583
- "timestamp" : time .Now (),
584
- }).Debug ("Starting the ping interval to server" )
585
-
586
- ticker .Reset (interval )
587
- } else {
588
- log .WithFields (log.Fields {
589
- "version" : AgentVersion ,
590
- "tenant_id" : a .authData .Namespace ,
591
- "server_address" : a .config .ServerAddress ,
592
- "timestamp" : time .Now (),
593
- }).Debug ("Stopped pinging server due listener status" )
594
-
595
- ticker .Stop ()
596
- }
597
- case <- ticker .C :
598
- if err := a .authorize (); err != nil {
599
- a .server .SetDeviceName (a .authData .Name )
600
- }
601
-
602
- log .WithFields (log.Fields {
603
- "version" : AgentVersion ,
604
- "tenant_id" : a .authData .Namespace ,
605
- "server_address" : a .config .ServerAddress ,
606
- "name" : a .authData .Name ,
607
- "hostname" : a .config .PreferredHostname ,
608
- "identity" : a .config .PreferredIdentity ,
609
- "timestamp" : time .Now (),
610
- }).Info ("Ping" )
611
-
612
- randTimeout := time .Duration (rand .Intn (a .config .MaxRetryConnectionTimeout - 10 )+ 10 ) * time .Second
613
- ticker .Reset (interval + randTimeout )
614
- }
615
- }
357
+ return a .ssh .Listen (ctx )
616
358
}
617
359
618
360
// CheckUpdate gets the ShellHub's server version.
0 commit comments