Skip to content

Commit d42c479

Browse files
committed
refactor(ssh,pkg): convert recorded session to events
1 parent bbaa709 commit d42c479

File tree

5 files changed

+85
-91
lines changed

5 files changed

+85
-91
lines changed

api/routes/session.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (h *Handler) EventSession(c gateway.Context) error {
133133

134134
return h.service.EventSession(c.Ctx(), models.UID(req.UID), &models.SessionEvent{
135135
Session: req.UID,
136-
Type: req.Type,
136+
Type: models.SessionEventType(req.Type),
137137
Timestamp: req.Timestamp,
138138
Data: req.Data,
139139
Seat: req.Seat,

pkg/models/session.go

+30-7
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,48 @@ type Status struct {
5252
}
5353

5454
type SessionRecorded struct {
55-
UID string `json:"uid"`
56-
Seat int `json:"seat"`
57-
Namespace string `json:"namespace" bson:"namespace"`
58-
Message string `json:"message" bson:"message"`
59-
Width int `json:"width" bson:"width,omitempty"`
60-
Height int `json:"height" bson:"height,omitempty"`
55+
UID string `json:"uid"`
56+
Output string `json:"output" bson:"output"`
6157
}
6258

6359
type SessionUpdate struct {
6460
Authenticated *bool `json:"authenticated"`
6561
Type *string `json:"type"`
6662
}
6763

64+
type SessionEventType string
65+
66+
const (
67+
// ShellHub custom requests.
68+
SessionEventTypePtyOutput SessionEventType = "pty-output"
69+
70+
// Terminal (PTY) request types
71+
SessionEventTypePtyRequest SessionEventType = "pty-req"
72+
SessionEventTypeWindowChange SessionEventType = "window-change"
73+
SessionEventTypeExitCode SessionEventType = "exit-code"
74+
75+
// Process-related requests
76+
SessionEventTypeExitStatus SessionEventType = "exit-status"
77+
SessionEventTypeExitSignal SessionEventType = "exit-signal"
78+
79+
// Environment and Shell requests
80+
SessionEventTypeEnv SessionEventType = "env"
81+
SessionEventTypeShell SessionEventType = "shell"
82+
SessionEventTypeExec SessionEventType = "exec"
83+
SessionEventTypeSubsystem SessionEventType = "subsystem"
84+
85+
// Signal and forwarding requests
86+
SessionEventTypeSignal SessionEventType = "signal"
87+
SessionEventTypeTcpipForward SessionEventType = "tcpip-forward"
88+
SessionEventTypeAuthAgentReq SessionEventType = "auth-agent-req"
89+
)
90+
6891
// SessionEvent represents a session event.
6992
type SessionEvent struct {
7093
// Session is the session UID where the event occurred.
7194
Session string `json:"session" bson:"session,omitempty"`
7295
// Type of the session. Normally, it is the SSH request name.
73-
Type string `json:"type" bson:"type"`
96+
Type SessionEventType `json:"type" bson:"type"`
7497
// Timestamp contains the time when the event was logged.
7598
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
7699
// Data is a generic structure containing data of the event, normally the unmarshaling data of the request.

ssh/server/channels/session.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func DefaultSessionHandler() gliderssh.ChannelHandler {
137137

138138
defer agent.Close()
139139

140-
go pipe(ctx, sess, client.Channel, agent.Channel, seat)
140+
go pipe(sess, client.Channel, agent.Channel, seat)
141141

142142
// TODO: Add middleware to block certain types of requests.
143143
for {

ssh/server/channels/utils.go

+21-75
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"sync"
66

77
"github.com/Masterminds/semver"
8-
gliderssh "github.com/gliderlabs/ssh"
98
"github.com/shellhub-io/shellhub/pkg/envs"
109
"github.com/shellhub-io/shellhub/pkg/models"
1110
"github.com/shellhub-io/shellhub/ssh/session"
@@ -14,88 +13,48 @@ import (
1413
)
1514

1615
type Recorder struct {
17-
queue chan string
16+
// channels is the source of data read, recorded and redirected to client.
1817
channel gossh.Channel
18+
// session is the session between Agent and Client.
19+
session *session.Session
20+
// seat is the current identifier of session's.
21+
seat int
1922
}
2023

21-
func NewRecorder(channel gossh.Channel, sess *session.Session, camera *session.Camera, seat int) (io.WriteCloser, error) {
22-
// NOTE: The queue's size is a random number.
23-
queue := make(chan string, 100)
24-
25-
go func() {
26-
for {
27-
msg, ok := <-queue
28-
if !ok {
29-
log.WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
30-
Warning("recorder queue is closed")
31-
32-
return
33-
}
34-
35-
if err := camera.WriteFrame(&models.SessionRecorded{ //nolint:errcheck
36-
UID: sess.UID,
37-
Seat: seat,
38-
Namespace: sess.Lookup["domain"],
39-
Message: msg,
40-
Width: int(sess.Pty.Columns),
41-
Height: int(sess.Pty.Rows),
42-
}); err != nil {
43-
log.WithError(err).
44-
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
45-
Warning("failed to send the session frame to record")
46-
47-
// NOTE: When a frame isn't sent correctly, we stop the writing loop, only reading from the queue,
48-
// and discarding the messages to avoid stuck the go routine.
49-
break
50-
}
51-
}
52-
53-
for {
54-
// NOTE: Reads the queue and discards the data to avoid stuck the go routine.
55-
if _, ok := <-queue; !ok {
56-
log.WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
57-
Warning("recorder queue is closed")
58-
59-
return
60-
}
61-
}
62-
}()
63-
24+
func NewRecorder(channel gossh.Channel, session *session.Session, seat int) (io.WriteCloser, error) {
6425
return &Recorder{
65-
queue: queue,
6626
channel: channel,
27+
session: session,
28+
seat: seat,
6729
}, nil
6830
}
6931

70-
// record enqueues a session frame to be recorded. If the queue is closed, nothing is done.
71-
func (c *Recorder) record(msg string) {
72-
select {
73-
case c.queue <- msg:
74-
default:
75-
log.Trace("the message couldn't sent to the record queue")
76-
}
77-
}
32+
// PtyOutputEventType is the event's type for an output.
33+
const PtyOutputEventType = "pty-output"
7834

79-
func (c *Recorder) Write(data []byte) (int, error) {
80-
read, err := c.channel.Write(data)
35+
func (c *Recorder) Write(output []byte) (int, error) {
36+
read, err := c.channel.Write(output)
8137
if err != nil {
8238
return read, err
8339
}
8440

85-
c.record(string(data))
41+
// NOTE: Writes the event into the event stream to be processed and send to target endpoint.
42+
c.session.Event(PtyOutputEventType, &models.SessionRecorded{
43+
UID: c.session.UID,
44+
Output: string(output),
45+
}, c.seat)
8646

8747
return read, nil
8848
}
8949

50+
// Close closes the internal channel.
9051
func (c *Recorder) Close() error {
91-
close(c.queue)
92-
9352
return c.channel.CloseWrite()
9453
}
9554

9655
// pipe function pipes data between client and agent, and vice versa, recording each frame when ShellHub instance are
9756
// Cloud or Enterprise.
98-
func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, agent gossh.Channel, seat int) {
57+
func pipe(sess *session.Session, client gossh.Channel, agent gossh.Channel, seat int) {
9958
defer log.
10059
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
10160
Trace("data pipe between client and agent has done")
@@ -110,23 +69,10 @@ func pipe(ctx gliderssh.Context, sess *session.Session, client gossh.Channel, ag
11069
defer wg.Done()
11170

11271
if envs.IsEnterprise() || envs.IsCloud() {
113-
recordURL := ctx.Value("RECORD_URL").(string)
114-
if recordURL == "" {
115-
log.WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID, "record_url": recordURL}).
116-
Warning("failed to start session's record because the record URL is empty")
117-
118-
goto normal
119-
}
120-
121-
camera, err := sess.Record(ctx, recordURL, seat)
122-
if err != nil {
123-
goto normal
124-
}
125-
126-
recorder, err := NewRecorder(client, sess, camera, seat)
72+
recorder, err := NewRecorder(client, sess, seat)
12773
if err != nil {
12874
log.WithError(err).
129-
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID, "record_url": recordURL}).
75+
WithFields(log.Fields{"session": sess.UID, "sshid": sess.SSHID}).
13076
Warning("failed to connect to session record endpoint")
13177

13278
goto normal

ssh/session/session.go

+32-7
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ type Session struct {
156156

157157
api internalclient.Client
158158
tunnel *httptunnel.Tunnel
159+
// Events is a channel used to send and process Events of a session.
160+
Events chan *models.SessionEvent
159161

160162
once *sync.Once
161163

@@ -247,6 +249,7 @@ func NewSession(ctx gliderssh.Context, tunnel *httptunnel.Tunnel, cache cache.Ca
247249
UID: ctx.SessionID(),
248250
api: api,
249251
tunnel: tunnel,
252+
Events: make(chan *models.SessionEvent, 100),
250253
Data: Data{
251254
IPAddress: hos.Host,
252255
Target: target,
@@ -269,6 +272,8 @@ func NewSession(ctx gliderssh.Context, tunnel *httptunnel.Tunnel, cache cache.Ca
269272

270273
snap.save(session, StateCreated)
271274

275+
go session.NewEventStream(ctx)
276+
272277
return session, nil
273278
}
274279

@@ -565,13 +570,13 @@ func (s *Session) NewSeat() (int, error) {
565570

566571
// Events register an event to the session.
567572
func (s *Session) Event(t string, data any, seat int) {
568-
go s.api.EventSession(s.UID, &models.SessionEvent{ //nolint:errcheck
573+
s.Events <- &models.SessionEvent{
569574
Session: s.UID,
570-
Type: t,
575+
Type: models.SessionEventType(t),
571576
Timestamp: clock.Now(),
572577
Data: data,
573578
Seat: seat,
574-
})
579+
}
575580
}
576581

577582
func Event[D any](sess *Session, t string, data []byte, seat int) {
@@ -580,13 +585,33 @@ func Event[D any](sess *Session, t string, data []byte, seat int) {
580585
return
581586
}
582587

583-
go sess.api.EventSession(sess.UID, &models.SessionEvent{ //nolint:errcheck
588+
sess.Events <- &models.SessionEvent{
584589
Session: sess.UID,
585-
Type: t,
590+
Type: models.SessionEventType(t),
586591
Timestamp: clock.Now(),
587-
Data: d,
592+
Data: data,
588593
Seat: seat,
589-
})
594+
}
595+
}
596+
597+
func (s *Session) NewEventStream(ctx context.Context) {
598+
for {
599+
select {
600+
case <-ctx.Done():
601+
log.WithFields(log.Fields{
602+
"session": s.UID,
603+
}).Debug("event stream loop done")
604+
605+
return
606+
case event := <-s.Events:
607+
log.WithFields(log.Fields{
608+
"session": s.UID,
609+
"event": event,
610+
}).Trace("event received on event stream")
611+
612+
go s.api.EventSession(s.UID, event) //nolint:errcheck
613+
}
614+
}
590615
}
591616

592617
func (s *Session) KeepAlive() error {

0 commit comments

Comments
 (0)