diff --git a/gnmi_server/db_journal.go b/gnmi_server/db_journal.go new file mode 100644 index 000000000..96c3273bf --- /dev/null +++ b/gnmi_server/db_journal.go @@ -0,0 +1,293 @@ +package gnmi + +import ( + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + log "github.com/golang/glog" + "github.com/redis/go-redis/v9" + + "github.com/Azure/sonic-mgmt-common/translib/db" + sdcfg "github.com/sonic-net/sonic-gnmi/sonic_db_config" +) + +const ( + maxFileSize = 2000000 // Bytes + maxBackups = 1 +) + +type DbJournal struct { + database string + rc *redis.Client + ps *redis.PubSub + notifications <-chan *redis.Message + cache map[string]map[string]string + file *os.File + fileName string + done chan bool +} + +var dbNums = map[string]db.DBNum{ + "CONFIG_DB": db.ConfigDB, + "STATE_DB": db.StateDB, +} + +// NewDbJournal returns a new DbJournal for the specified database. +func NewDbJournal(database string) (*DbJournal, error) { + var err error + journal := &DbJournal{} + journal.database = database + dbNum, ok := dbNums[journal.database] + if !ok { + return nil, errors.New("Invalid database passed into NewDbJournal") + } + + ns, _ := sdcfg.GetDbDefaultNamespace() + addr, _ := sdcfg.GetDbTcpAddr(journal.database, ns) + dbId, _ := sdcfg.GetDbId(journal.database, ns) + journal.rc = db.TransactionalRedisClientWithOpts(&redis.Options{ + Network: "tcp", + Addr: addr, + Password: "", + DB: dbId, + DialTimeout: 0, + }) + + if err = journal.init(); err != nil { + return nil, err + } + + keyspace := fmt.Sprintf("__keyspace@%d__:*", dbNum) + keyevent := fmt.Sprintf("__keyevent@%d__:*", dbNum) + journal.ps = journal.rc.PSubscribe(context.Background(), keyspace, keyevent) + if _, err = journal.ps.Receive(context.Background()); err != nil { + return nil, err + } + + journal.notifications = journal.ps.Channel() + + journal.fileName = filepath.Join(HostVarLogPath, strings.ToLower(journal.database)+".txt") + if journal.file, err = os.OpenFile(journal.fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil { + return nil, err + } + + journal.done = make(chan bool, 1) + + go journal.journal() + log.V(2).Infof("Successfully started the DbJournal for %v", journal.database) + return journal, nil +} + +// Close closes the redis objects and the journal file. +func (dbj *DbJournal) Close() { + if dbj == nil { + return + } + dbj.done <- true +} + +func (dbj *DbJournal) cleanup() { + if dbj == nil { + return + } + if dbj.ps != nil { + dbj.ps.Close() + } + if dbj.rc != nil { + db.CloseRedisClient(dbj.rc) + dbj.rc = nil + } + if dbj.file != nil { + dbj.file.Close() + } + if dbj.cache != nil { + dbj.cache = map[string]map[string]string{} + } + log.V(2).Infof("DbJournal closed successfully!") +} + +// init initializes the journal's cache. +func (dbj *DbJournal) init() error { + if dbj == nil || dbj.rc == nil { + return errors.New("DbJournal: redis client is nil") + } + dbj.cache = map[string]map[string]string{} + keys, kErr := dbj.rc.Keys(context.Background(), "*").Result() + if kErr != nil { + return kErr + } + for _, key := range keys { + entry, eErr := dbj.rc.HGetAll(context.Background(), key).Result() + if eErr != nil { + entry = map[string]string{} + } + dbj.cache[key] = entry + } + return nil +} + +// journal monitors the database notifications and logs events to the file. +func (dbj *DbJournal) journal() { + if dbj == nil { + return + } + defer dbj.cleanup() + var event []string + for { + select { + case msg := <-dbj.notifications: + event = append(event, msg.Payload) + if len(event) != 2 { + continue + } + op := event[0] + table := event[1] + entry := fmt.Sprintf("%v: %v %v", time.Now().Format("2006-01-02.15:04:05.000000"), op, table) + diff, dErr := dbj.updateCache(event) + if dErr != nil { + log.V(0).Infof("Shutting down %v Journal: %v", dbj.database, dErr) + return + } + event = []string{} + + if diff != "" { + entry += " " + diff + } + // If no fields were changed or the operation is a set on a table that contains the DB name, don't log the event. + if (diff == "" && (op == "hset" || op == "hdel")) || (op == "set" && strings.Contains(table, dbj.database)) { + continue + } + + if err := dbj.rotateFile(); err != nil { + log.V(0).Infof("Shutting down DbJournal, failed to manage file rotation: %v", err) + return + } + _, writeErr := dbj.file.Write([]byte(entry + "\n")) + if writeErr != nil { + log.V(0).Infof("Failed to write to DbJournal file: %v", writeErr) + } + case <-dbj.done: + return + } + } +} + +// updateCache updates the cache with the latest database entry and returns the diff. +func (dbj *DbJournal) updateCache(event []string) (string, error) { + op := event[0] + table := event[1] + if dbj == nil || dbj.cache == nil || dbj.rc == nil { + return "", errors.New("nil members present in DbJournal") + } + oldEntry, ok := dbj.cache[table] + if !ok { + oldEntry = map[string]string{} + } + newEntry, err := dbj.rc.HGetAll(context.Background(), table).Result() + if err != nil { + newEntry = map[string]string{} + } + // Update the cache + dbj.cache[table] = newEntry + + if op == "del" { + return "", nil + } + + diff := "" + // Find deleted and changed fields + for k, v := range oldEntry { + newVal, ok := newEntry[k] + if !ok { + diff += "-" + k + " " + continue + } + if newVal != v { + diff += k + "=" + newVal + " " + } + } + + // Find added fields + for k, v := range newEntry { + if _, ok := oldEntry[k]; !ok { + diff += "+" + k + ":" + v + " " + } + } + + return diff, nil +} + +// rotateFile makes sure the journal file is opened correctly and rotates it +// if it exceeds the maximum size. +func (dbj *DbJournal) rotateFile() error { + if dbj == nil { + return errors.New("Couldn't rotate file, DbJournal is nil") + } + fileStat, err := os.Stat(dbj.fileName) + if err != nil || dbj.file == nil { + // File does not exist or it is closed, create/open it + if dbj.file, err = os.OpenFile(dbj.fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644); err != nil { + return err + } + return nil + } + + if fileStat.Size() >= maxFileSize { + // Close the journal file and open it as read-only to copy it + dbj.file.Close() + if dbj.file, err = os.OpenFile(dbj.fileName, os.O_RDONLY, 0644); err != nil { + return err + } + + // Remove a rotated, zipped file if the maxBackups limit is reached + files, err := os.ReadDir(HostVarLogPath) + if err != nil { + return err + } + var count uint + var oldest string + for _, file := range files { + if strings.HasPrefix(file.Name(), strings.ToLower(dbj.database)) && strings.HasSuffix(file.Name(), ".gz") { + count++ + if strings.Compare(file.Name(), oldest) == -1 || oldest == "" { + oldest = file.Name() + } + } + } + if count >= maxBackups { + if err := os.Remove(filepath.Join(HostVarLogPath, oldest)); err != nil { + return err + } + } + + // Compress the file + zipName := filepath.Join(HostVarLogPath, strings.ToLower(dbj.database)+"_"+time.Now().Format("20060102150405")+".gz") + zipFile, err := os.Create(zipName) + if err != nil { + return err + } + defer zipFile.Close() + zipWriter := gzip.NewWriter(zipFile) + defer zipWriter.Close() + + if _, err = io.Copy(zipWriter, dbj.file); err != nil { + return err + } + if err = zipWriter.Flush(); err != nil { + return err + } + + // Recreate the journal file + if dbj.file, err = os.Create(dbj.fileName); err != nil { + return err + } + } + return nil +} diff --git a/gnmi_server/db_journal_test.go b/gnmi_server/db_journal_test.go new file mode 100644 index 000000000..5c64b1308 --- /dev/null +++ b/gnmi_server/db_journal_test.go @@ -0,0 +1,201 @@ +package gnmi + +import ( + "os" + "strings" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestNewDbJournal(t *testing.T) { + tests := []struct { + desc string + db string + wantErr bool + }{ + { + desc: "Success", + db: "CONFIG_DB", + wantErr: false, + }, + { + desc: "InvalidDb", + db: "INVALID_DB", + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + journal, err := NewDbJournal(test.db) + if err == nil { + journal.Close() + } + + if test.wantErr != (err != nil) { + t.Fatalf("NewDbJournal did not return the expected error - wantErr=%v, err=%v", test.wantErr, err) + } + }) + } +} + +func TestDbJournalInit(t *testing.T) { + tests := []struct { + desc string + dbj *DbJournal + wantErr bool + }{ + { + desc: "Success", + dbj: nil, + wantErr: false, + }, + { + desc: "NilRedisClient", + dbj: &DbJournal{ + database: "CONFIG_DB", + rc: nil, + }, + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + var err error + if test.dbj == nil { + if test.dbj, err = NewDbJournal("CONFIG_DB"); err != nil { + t.Fatalf("Failed to create new DbJournal: %v", err) + } + defer test.dbj.Close() + } + err = test.dbj.init() + + if test.wantErr != (err != nil) { + t.Fatalf("init did not return the expected error - wantErr=%v, err=%v", test.wantErr, err) + } + }) + } +} + +func TestDbJournalUpdateCache(t *testing.T) { + tests := []struct { + desc string + dbj *DbJournal + event []string + wantErr bool + }{ + { + desc: "SuccessHSet", + dbj: nil, + event: []string{"hset", "PORT|Ethernet1"}, + wantErr: false, + }, + { + desc: "SuccessDel", + dbj: nil, + event: []string{"del", "PORT|Ethernet1"}, + wantErr: false, + }, + { + desc: "NilCache", + dbj: &DbJournal{ + rc: &redis.Client{}, + cache: nil, + }, + event: []string{"hset", "PORT|Ethernet1"}, + wantErr: true, + }, + { + desc: "NilRedisClient", + dbj: &DbJournal{ + rc: nil, + cache: map[string]map[string]string{}, + }, + event: []string{"hset", "PORT|Ethernet1"}, + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + var err error + if test.dbj == nil { + if test.dbj, err = NewDbJournal("CONFIG_DB"); err != nil { + t.Fatalf("Failed to create new DbJournal: %v", err) + } + defer test.dbj.Close() + } + _, err = test.dbj.updateCache(test.event) + + if test.wantErr != (err != nil) { + t.Fatalf("init did not return the expected error - wantErr=%v, err=%v", test.wantErr, err) + } + }) + } +} + +func TestDbJournalRotateFile(t *testing.T) { + // Set up a DbJournal + dbj, err := NewDbJournal("CONFIG_DB") + if err != nil { + t.Fatalf("Failed to create NewDbJournal: %v", err) + } + defer dbj.Close() + + // If DbJournal has a nil file pointer, it should be handled by rotateFile() + dbj.file = nil + if err := dbj.rotateFile(); err != nil { + t.Fatalf("Rotate failed because of nil file pointer: %v", err) + } + + // Fill the file a few times to make sure rotate is working correctly + for i := 0; i < maxBackups+2; i++ { + // Make sure the file was created and open it + file, err := os.OpenFile(HostVarLogPath+"/config_db.txt", os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + t.Fatalf("Failed to open DbJournal file: %v", err) + } + + // Fill the file to reach 5MB + if err := file.Truncate(maxFileSize); err != nil { + t.Fatalf("Failed to write to DbJournal file: %v", err) + } + + if err := dbj.rotateFile(); err != nil { + t.Fatalf("rotateFile failed: %v", err) + } + + time.Sleep(1 * time.Second) + + // Make sure the file was rotated + fileStat, err := os.Stat(HostVarLogPath + "/config_db.txt") + if err != nil { + t.Fatalf("Couldn't find DbJournal file: %v", err) + } + if fileStat.Size() >= 10000 { + t.Fatalf("DbJournal file was not rotated: size=%v", fileStat.Size()) + } + } + + zippedFiles := 0 + journalFiles := 0 + files, err := os.ReadDir(HostVarLogPath) + if err != nil { + t.Fatalf("Failed to read HostVarLog dir: %v", err) + } + for _, file := range files { + if file.Name() == "config_db.txt" { + journalFiles++ + } + if strings.HasPrefix(file.Name(), "config_db") && strings.HasSuffix(file.Name(), ".gz") { + zippedFiles++ + } + } + if journalFiles != 1 || zippedFiles != maxBackups { + t.Fatalf("Files not rotated correctly: journalFiles=%v, zippedFiles=%v", journalFiles, zippedFiles) + } + +} diff --git a/gnmi_server/server.go b/gnmi_server/server.go index f903a88b6..81096aff9 100644 --- a/gnmi_server/server.go +++ b/gnmi_server/server.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "encoding/pem" "errors" + "flag" "fmt" "net" "os" @@ -55,6 +56,8 @@ import ( "google.golang.org/grpc/status" ) +var enableConfigDbJournal = flag.Bool("enable_config_db_journal", false, "enable config db journal") + var ( muPath = &sync.RWMutex{} supportedEncodings = []gnmipb.Encoding{gnmipb.Encoding_JSON, gnmipb.Encoding_JSON_IETF, gnmipb.Encoding_PROTO} @@ -65,6 +68,9 @@ const ( authLogPath = "/host_var/log/messages" authzRefreshingInterval = 5 * time.Second ) +const ( + HostVarLogPath = "/var/log" +) // Server manages a single gNMI Server implementation. Each client that connects // via Subscribe or Get will receive a stream of updates based on the requested @@ -97,6 +103,8 @@ type Server struct { gnsiAuthz *GNSIAuthzServer gnsiPathz *GNSIPathzServer ConnectionManager *ConnectionManager + // DB Journals + configDbJournal *DbJournal } // handleOperationalGet handles OPERATIONAL target requests directly with standard gNMI types @@ -602,6 +610,12 @@ func NewServer(config *Config, tlsOpts []grpc.ServerOption, commonOpts []grpc.Se return nil, errors.New("no listener configured: port must be > 0 or unix_socket must be set") } + if *enableConfigDbJournal { + srv.configDbJournal, err = NewDbJournal("CONFIG_DB") + if err != nil { + return nil, fmt.Errorf("failed to create CONFIG_DB Journal: %v", err) + } + } log.V(1).Infof("Created Server on %s, read-only: %t", srv.Address(), !srv.config.EnableTranslibWrite) return srv, nil } diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index 6a5cbd7ab..3ba13e6aa 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -12,6 +12,7 @@ import ( "encoding/pem" "flag" "fmt" + "io" "io/ioutil" "net" "os" @@ -26,6 +27,7 @@ import ( "time" "unsafe" + "github.com/Azure/sonic-mgmt-common/translib/db" "github.com/sonic-net/sonic-gnmi/common_utils" spb "github.com/sonic-net/sonic-gnmi/proto" sgpb "github.com/sonic-net/sonic-gnmi/proto/gnoi" @@ -6481,6 +6483,94 @@ func TestGnoiAuthorization(t *testing.T) { s.Stop() } +func TestConfigDbJournal(t *testing.T) { + //Since the enableConfigDbJournal is set to false by default, enable it in UT to execute the tests + val := true + enableConfigDbJournal = &val + + ns, _ := sdcfg.GetDbDefaultNamespace() + rclient := getConfigDbClient(t, ns) + defer db.CloseRedisClient(rclient) + tests := []struct { + desc string + cmd func() + expectedEntry string + }{ + { + desc: "HSetNew", + cmd: func() { + rclient.HSet(context.Background(), "DB_JOURNAL|Test", "new", "test") + }, + expectedEntry: "hset DB_JOURNAL|Test +new:test", + }, + { + desc: "HSetExisting", + cmd: func() { + rclient.HSet(context.Background(), "DB_JOURNAL|Test", "new", "already exists") + }, + expectedEntry: "hset DB_JOURNAL|Test new=already exists", + }, + { + desc: "HDel", + cmd: func() { + rclient.HDel(context.Background(), "DB_JOURNAL|Test", "new") + }, + expectedEntry: "hdel DB_JOURNAL|Test -new", + }, + { + desc: "Set", + cmd: func() { + rclient.Set(context.Background(), "NEW_DBJOURNAL_TABLE", "TEST", 0) + }, + expectedEntry: "set NEW_DBJOURNAL_TABLE", + }, + { + desc: "Del", + cmd: func() { + rclient.Del(context.Background(), "NEW_DBJOURNAL_TABLE") + }, + expectedEntry: "del NEW_DBJOURNAL_TABLE", + }, + } + + s := createServer(t, 8081) + go runServer(t, s) + defer s.Stop() + + // Ensure the keys used in this test are not already in the DB. + rclient.Del(context.Background(), "DB_JOURNAL|Test") + rclient.Del(context.Background(), "NEW_DBJOURNAL_TABLE") + time.Sleep(500 * time.Millisecond) + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + // Clear the DbJournal file + err := os.Remove(HostVarLogPath + "/config_db.txt") + if err != nil { + t.Fatalf("Failed to remove journal file: %v", err) + } + + // Trigger a redis event + test.cmd() + + time.Sleep(500 * time.Millisecond) + + // Verify the contents of the file + file, err := os.Open(HostVarLogPath + "/config_db.txt") + if err != nil { + t.Fatalf("Failed to open file: %v", err) + } + defer file.Close() + data, err := io.ReadAll(file) + if err != nil { + t.Fatalf("Failed to read file: %v", err) + } + if !strings.Contains(string(data), test.expectedEntry) { + t.Fatalf("Incorrect file contents: %s", data) + } + }) + } +} func init() { // Enable logs at UT setup