Skip to content

Commit 4af3606

Browse files
changes for adding properties to callog to track rac timestamp value (#391)
* changes for adding properties to callog to track rac timestamp value * added pid for worker process id in cal log * incorporate review comments for moving event logic to same loop * fixing rac maint concurrency issues * changes for racmaint in occ * fixing test bind less error * fixing test bind throttle error --------- Co-authored-by: Rajesh S <[email protected]>
1 parent cc38b95 commit 4af3606

File tree

5 files changed

+157
-33
lines changed

5 files changed

+157
-33
lines changed

lib/racmaint.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ func InitRacMaint(cmdLineModuleName string) {
6161
interval := GetConfig().RacMaintReloadInterval
6262
if interval > 0 {
6363
for i := 0; i < GetConfig().NumOfShards; i++ {
64-
go racMaintMain(i, interval, cmdLineModuleName)
64+
shardIndex := i //Address the behavior called variable capture.
65+
go racMaintMain(shardIndex, interval, cmdLineModuleName)
6566
}
6667
}
6768
}
6869

6970
// racMaintMain wakes up every n seconds (configured in "rac_sql_interval") and reads the table
71+
//
7072
// [ManagementTablePrefix]_maint table to see if maintenance is requested
7173
func racMaintMain(shard int, interval int, cmdLineModuleName string) {
7274
if logger.GetLogger().V(logger.Debug) {
@@ -109,8 +111,8 @@ func racMaintMain(shard int, interval int, cmdLineModuleName string) {
109111
}
110112

111113
/*
112-
racMaint is the main function for RAC maintenance processing, being called regularly.
113-
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
114+
racMaint is the main function for RAC maintenance processing, being called regularly.
115+
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
114116
*/
115117
func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLineModuleName string, prev map[racCfgKey]racCfg) {
116118
//
@@ -220,12 +222,12 @@ func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLine
220222
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRW, 0, shard)
221223
}
222224
if err == nil {
223-
go workerpool.RacMaint(racReq)
225+
workerpool.RacMaint(racReq)
224226
}
225227
if GetConfig().ReadonlyPct > 0 {
226-
workerpool, err := GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
228+
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
227229
if err == nil {
228-
go workerpool.RacMaint(racReq)
230+
workerpool.RacMaint(racReq)
229231
}
230232
}
231233
prev[cfgKey] = row

lib/workerpool.go

+25-25
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
119119
worker.setState(wsSchd)
120120
millis := rand.Intn(GetConfig().RandomStartMs)
121121
if logger.GetLogger().V(logger.Alert) {
122-
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms",millis)
122+
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms", millis)
123123
}
124124
time.Sleep(time.Millisecond * time.Duration(millis))
125125

@@ -131,7 +131,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
131131
}
132132
millis := rand.Intn(3000)
133133
if logger.GetLogger().V(logger.Alert) {
134-
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start",wid)
134+
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start", wid)
135135
}
136136
time.Sleep(time.Millisecond * time.Duration(millis))
137137
}
@@ -233,8 +233,10 @@ func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error) {
233233
// GetWorker gets the active worker if available. backlog with timeout if not.
234234
//
235235
// @param sqlhash to check for soft eviction against a blacklist of slow queries.
236-
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
237-
// way to do this except to pass in the sqlhash as a parameter.
236+
//
237+
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
238+
// way to do this except to pass in the sqlhash as a parameter.
239+
//
238240
// @param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.
239241
func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error) {
240242
if logger.GetLogger().V(logger.Debug) {
@@ -559,10 +561,10 @@ func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err e
559561
}
560562
if skipRecycle {
561563
if logger.GetLogger().V(logger.Alert) {
562-
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:=", pool.desiredSize)
564+
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:=", pool.desiredSize)
563565
}
564566
calMsg := fmt.Sprintf("Recycle(worker_pid)=%d, module_name=%s,shard_id=%d", worker.pid, worker.moduleName, worker.shardID)
565-
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","ReturnWorker", cal.TransOK, calMsg)
567+
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "ReturnWorker", cal.TransOK, calMsg)
566568
evt.Completed()
567569
}
568570

@@ -697,8 +699,6 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
697699
}
698700
now := time.Now().Unix()
699701
window := GetConfig().RacRestartWindow
700-
dbUname := ""
701-
cnt := 0
702702
pool.poolCond.L.Lock()
703703
for i := 0; i < pool.currentSize; i++ {
704704
if (pool.workers[i] != nil) && (racReq.instID == 0 || pool.workers[i].racID == racReq.instID) && (pool.workers[i].startTime < int64(racReq.tm)) {
@@ -716,23 +716,23 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
716716
}
717717

718718
if logger.GetLogger().V(logger.Verbose) {
719-
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize)
720-
}
721-
cnt++
722-
if len(dbUname) == 0 {
723-
dbUname = pool.workers[i].dbUname
719+
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize, "rac.req timestamp=", racReq.tm)
724720
}
721+
//Trigger individual event for worker
722+
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
723+
evt.AddDataStr("poolModName", pool.moduleName)
724+
evt.AddDataInt("workerId", int64(i))
725+
evt.AddDataInt("pid", int64(pool.workers[i].pid))
726+
evt.AddDataInt("shardId", int64(pool.ShardID))
727+
evt.AddDataInt("tm", int64(racReq.tm))
728+
evt.AddDataInt("exitTime", pool.workers[i].exitTime)
729+
evt.AddDataStr("exitInSec", fmt.Sprintf("%dsec", pool.workers[i].exitTime-now))
730+
evt.Completed()
731+
evt = cal.NewCalEvent("DB_UNAME", pool.workers[i].dbUname, cal.TransOK, "")
732+
evt.Completed()
725733
}
726734
}
727735
pool.poolCond.L.Unlock()
728-
// TODO: C++ worker logs one event for each worker, in the worker, so
729-
// we keep the same. Think about changing it
730-
for i := 0; i < cnt; i++ {
731-
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
732-
evt.Completed()
733-
evt = cal.NewCalEvent("DB_UNAME", dbUname, cal.TransOK, "")
734-
evt.Completed()
735-
}
736736
}
737737

738738
// checkWorkerLifespan is called periodically to check if any worker lifetime has expired and terminates it
@@ -768,12 +768,12 @@ func (pool *WorkerPool) checkWorkerLifespan() {
768768
pool.poolCond.L.Lock()
769769
for i := 0; i < pool.currentSize; i++ {
770770
if (pool.workers[i] != nil) && (pool.workers[i].exitTime != 0) && (pool.workers[i].exitTime <= now) {
771-
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize*GetConfig().MaxDesiredHealthyWorkerPct/100)) { // Should it be a config value
771+
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize * GetConfig().MaxDesiredHealthyWorkerPct / 100)) { // Should it be a config value
772772
if logger.GetLogger().V(logger.Alert) {
773-
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
773+
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
774774
}
775775
calMsg := fmt.Sprintf("module_name=%s,shard_id=%d", pool.moduleName, pool.ShardID)
776-
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","checkWorkerLifespan", cal.TransOK, calMsg)
776+
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "checkWorkerLifespan", cal.TransOK, calMsg)
777777
evt.Completed()
778778
break
779779
}
@@ -814,7 +814,7 @@ func (pool *WorkerPool) checkWorkerLifespan() {
814814
pool.poolCond.L.Unlock()
815815
for _, w := range workers {
816816
if logger.GetLogger().V(logger.Info) {
817-
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID ,"HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
817+
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
818818
}
819819
w.Terminate()
820820
}

tests/unittest/bindLess/main_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func TestBindLess(t *testing.T) {
211211
logger.GetLogger().Log(logger.Debug, "TestBindLess +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
212212
testutil.BackupAndClear("cal", "BindLess start")
213213
testutil.BackupAndClear("hera", "BindLess start")
214-
err := partialBadLoad(0.10)
214+
err := partialBadLoad(0.07)
215215
if err != nil && err != NormCliErr() {
216216
t.Fatalf("main step function returned err %s", err.Error())
217217
}

tests/unittest/bindThrottle/main_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ func mkClients(num int, stop *int, bindV int, grpName string, outErr *string, db
205205
func TestBindThrottle(t *testing.T) {
206206
// we would like to clear hera.log, but even if we try, lots of messages still go there
207207
logger.GetLogger().Log(logger.Debug, "BindThrottle +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
208-
err := partialBadLoad(0.10)
208+
err := partialBadLoad(0.07)
209209
if err != nil && err != NormCliErr() {
210210
t.Fatalf("main step function returned err %s", err.Error())
211211
}
+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"math/rand"
8+
"os"
9+
"testing"
10+
"time"
11+
12+
"github.com/paypal/hera/tests/unittest/testutil"
13+
"github.com/paypal/hera/utility/logger"
14+
)
15+
16+
var mx testutil.Mux
17+
var tableName string
18+
19+
func cfg() (map[string]string, map[string]string, testutil.WorkerType) {
20+
21+
appcfg := make(map[string]string)
22+
// best to chose an "unique" port in case golang runs tests in paralel
23+
appcfg["bind_port"] = "31002"
24+
appcfg["log_level"] = "5"
25+
appcfg["log_file"] = "hera.log"
26+
appcfg["sharding_cfg_reload_interval"] = "0"
27+
appcfg["rac_sql_interval"] = "1"
28+
29+
opscfg := make(map[string]string)
30+
opscfg["opscfg.default.server.max_connections"] = "10"
31+
opscfg["opscfg.default.server.log_level"] = "5"
32+
33+
//return appcfg, opscfg, testutil.OracleWorker
34+
return appcfg, opscfg, testutil.MySQLWorker
35+
}
36+
37+
func before() error {
38+
os.Setenv("PARALLEL", "1")
39+
pfx := os.Getenv("MGMT_TABLE_PREFIX")
40+
if pfx == "" {
41+
pfx = "hera"
42+
}
43+
tableName = pfx + "_maint"
44+
return nil
45+
}
46+
47+
func TestMain(m *testing.M) {
48+
os.Exit(testutil.UtilMain(m, cfg, before))
49+
}
50+
51+
func TestRacMaintWithRandomStatusChangeInAsync(t *testing.T) {
52+
logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
53+
shard := 0
54+
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
55+
if err != nil {
56+
t.Fatal("Error starting Mux:", err)
57+
return
58+
}
59+
statusArray := []string{"U", "R", "F"}
60+
time.Sleep(5 * time.Second)
61+
62+
go func() {
63+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
64+
defer cancel()
65+
for {
66+
status1 := rand.Intn(len(statusArray))
67+
status2 := rand.Intn(len(statusArray))
68+
var err error
69+
var conn *sql.Conn
70+
// cleanup and insert one row in the table
71+
conn, err = db.Conn(ctx)
72+
if err != nil {
73+
t.Fatalf("Error getting connection %s\n", err.Error())
74+
}
75+
tx, _ := conn.BeginTx(ctx, nil)
76+
stmt, _ := tx.PrepareContext(ctx, "/*cmd*/delete from "+tableName)
77+
_, err = stmt.Exec()
78+
if err != nil {
79+
t.Fatalf("Error preparing test (delete table) %s\n", err.Error())
80+
}
81+
stmt, _ = tx.PrepareContext(ctx, "/*cmd*/insert into "+tableName+" (inst_id, status, status_time, module, machine) values (?,?,?,?,?)")
82+
hostname, _ := os.Hostname()
83+
// how to do inst_id
84+
_, err = stmt.Exec(0 /*max instid*/, statusArray[status1], time.Now().Unix()+2, "hera-test", hostname)
85+
_, err = stmt.Exec(0, statusArray[status2], time.Now().Unix()+2, "hera-test_taf", hostname)
86+
if err != nil {
87+
t.Fatalf("Error preparing test (create row in table) %s\n", err.Error())
88+
}
89+
err = tx.Commit()
90+
if err != nil {
91+
t.Fatalf("Error commit %s\n", err.Error())
92+
}
93+
conn.Close()
94+
time.Sleep(1000 * time.Millisecond)
95+
}
96+
}()
97+
if err != nil {
98+
t.Fatal("Error starting Mux:", err)
99+
return
100+
}
101+
db.SetMaxIdleConns(0)
102+
defer db.Close()
103+
104+
time.Sleep(45000 * time.Millisecond)
105+
106+
if 0 == testutil.RegexCountFile("Rac maint activating, worker", "hera.log") {
107+
t.Fatalf("requires rac maint activation for main module status")
108+
}
109+
110+
if 0 == testutil.RegexCountFile("module:HERA-TEST_TAF", "cal.log") {
111+
t.Fatalf("Status 'U' should log the RACMAINT_INFO_CHANGE event")
112+
}
113+
if 0 != testutil.RegexCountFile("invalid_status", "cal.log") {
114+
t.Fatalf("ram maint status 'U' should not skip with invalid-status event")
115+
}
116+
117+
if testutil.RegexCountFile("RAC_ID", "cal.log") < 20 {
118+
t.Fatalf("ram maint should trigger for all workers once.")
119+
}
120+
121+
logger.GetLogger().Log(logger.Debug, "TestRacMaintWithRandomStatusChangeInAsync done -------------------------------------------------------------")
122+
}

0 commit comments

Comments
 (0)