Skip to content

Load Emulation #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/phoenix/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ type TaskSchedulerConfig struct {
}

type WorkerGodConfig struct {
Addr string
WorkerGod phoenix.WorkerGod
Ready chan bool
Addr string
WorkerGod phoenix.WorkerGod
Ready chan bool
}

//type JobGeneratorConfig struct {
// Seed int
// TaskDuration int
Expand Down Expand Up @@ -90,9 +91,9 @@ func (pc *PhoenixConfig) NewExecutorConfig(i int, ec phoenix.ExecutorInterface)

func (pc *PhoenixConfig) NewWorkerGodConfig(i int, ww phoenix.WorkerGod) *WorkerGodConfig {
return &WorkerGodConfig{
Addr: pc.WorkerGods[i],
Addr: pc.WorkerGods[i],
WorkerGod: ww,
Ready: make(chan bool, 1),
Ready: make(chan bool, 1),
}
}

Expand Down
32 changes: 16 additions & 16 deletions src/phoenix/config/multi_node.conf
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
{
"NumSlots": 4,
"Frontends": [
"172.31.28.99:31363"
],
"Schedulers": [
"172.31.31.12:31359"
],
"Monitors": [
"172.31.22.104:31360"
],
"Executors": [
"172.31.22.104:31361"
],
"WorkerGods": [
"172.31.22.104:31362"
]
"NumSlots": 4,
"Frontends": [
"172.31.28.99:31363"
],
"Schedulers": [
"172.31.31.12:31359"
],
"Monitors": [
"172.31.22.104:31360"
],
"Executors": [
"172.31.22.104:31361"
],
"WorkerGods": [
"172.31.22.104:31362"
]
}
52 changes: 52 additions & 0 deletions src/phoenix/frontend/manual-1/g_emulation_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import "math/rand"

type GoogleClusterTaskGenerator struct {
TaskDuration float64
RandSeed int64
TaskCount int
}

var googleTimeFactors = []int{
51182,
61100,
76970,
96318,
102699,
106596,
110659,
111951,
112349,
114887,
123163,
129392,
129573,
129698,
129844,
129954}

const GOOGLE_TIME_MAX_RANGE = 129954

func NewGoogleClusterTaskGenerator(taskDuration float64, randSeed int64, taskCount int) GoogleClusterTaskGenerator {
ret := GoogleClusterTaskGenerator{
TaskDuration: taskDuration,
RandSeed: randSeed,
TaskCount: taskCount,
}

rand.Seed(randSeed)
return ret
}

func (g GoogleClusterTaskGenerator) GetTaskDuration() float64 {
targetRange := rand.Int() % GOOGLE_TIME_MAX_RANGE

for i := 0; i < len(googleTimeFactors); i++ {
if googleTimeFactors[i] >= targetRange {
return g.TaskDuration * (float64(i) + 1.0)
}
}

return g.TaskDuration
}
28 changes: 23 additions & 5 deletions src/phoenix/frontend/manual-1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import (
"time"
)

const DefaultRandSeed int64 = -13131313

var (
frc = flag.String("conf", config.DefaultConfigPath, "config file")
useRand = flag.Bool("useRand", false, "use random seed to generate job, default to hash based on address")
jobCount = flag.Int("jobCount", 10, "number of job to generate")
taskCount = flag.Int("taskCount", 10, "number of task in a job")
meanDuration = flag.Float64("jobDuration", 3.0, "job duration in second")
randSeed = flag.Int64("randSeed", DefaultRandSeed, "task generation seed")
gEmulation = flag.Bool("gEmu", false, "use google cluster workload pattern")
)

func noError(e error) {
Expand Down Expand Up @@ -54,11 +58,21 @@ func main() {
<-feConfig.Ready

if *useRand {
rand.Seed(time.Now().UnixNano())
// The case where we did not pass in a seed but still want to be rand
if *randSeed == DefaultRandSeed {
rand.Seed(time.Now().UnixNano())
} else {
rand.Seed(*randSeed)
}
} else {
// just some random number here
var randSeed int64 = 1111
rand.Seed(randSeed)
// just some random number here for the purpose of predictable workload emulation
var localRandSeed int64 = 1111
rand.Seed(localRandSeed)
}

var gGenerator GoogleClusterTaskGenerator
if *gEmulation {
gGenerator = NewGoogleClusterTaskGenerator(*meanDuration, *randSeed, *taskCount)
}

numTasks := *taskCount
Expand All @@ -78,6 +92,10 @@ func main() {
currTaskDuration *= rand.ExpFloat64()
}

if *gEmulation {
currTaskDuration = gGenerator.GetTaskDuration()
}

for j := 0; j < numTasks; j++ {
taskid := jobid + "-task" + strconv.Itoa(j)

Expand Down Expand Up @@ -121,7 +139,7 @@ func main() {

// We can use worker-god to start or kill more jobs here

<- allJobsDoneSignal
<-allJobsDoneSignal

slotCount := len(rc.Executors) * rc.NumSlots
theoreticalLowerBound := sumOfTaskTimes / float64(slotCount)
Expand Down
1 change: 0 additions & 1 deletion src/phoenix/init/init-config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func main() {
phoenixConfig.Executors = make([]string, *nMonitors)
phoenixConfig.WorkerGods = make([]string, *nMonitors)


ipAddrs := strings.Split(*ips, ",")
if nMachine := len(ipAddrs); nMachine > 0 {
for i := 0; i < *nSchedulers; i++ {
Expand Down
15 changes: 7 additions & 8 deletions src/phoenix/scheduler/task_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type TaskScheduler struct {
Addr string

// map of addresses to Monitors that we are able to contact
MonitorClientPool map[string]*monitor.NodeMonitorClient
MonitorClientPool map[string]*monitor.NodeMonitorClient

// addresses of our workers
workerAddresses []string
Expand All @@ -27,10 +27,10 @@ type TaskScheduler struct {
workerAddrToTask map[string]map[string]bool

// map of worker address to count per jobId reservations
workerAddrToJobReservations map[string]map[string] int
workerAddrToJobReservations map[string]map[string]int

// lock around MonitorClientPool, workerAddresses, workerAddrToTask, workerIdReservations
workerLock sync.Mutex
workerLock sync.Mutex

// Frontend Client Pool
FrontendClientPool map[string]phoenix.FrontendInterface
Expand Down Expand Up @@ -135,7 +135,6 @@ func (ts *TaskScheduler) watchWorkerNodes(zkHostPorts []string, ready chan bool)

func (ts *TaskScheduler) rescheduleLostTasks(children []string) {


// create new client pool and newWorkerIds
newClientPool := make(map[string]*monitor.NodeMonitorClient)
newWorkerIds := make([]string, len(children))
Expand Down Expand Up @@ -275,7 +274,7 @@ func (ts *TaskScheduler) GetTask(taskRequest types.TaskRequest, outputTask *type

// pendingTask is now inflight at workerAddrToTask
_, exists := ts.workerAddrToTask[taskRequest.WorkerAddr]
if ! exists {
if !exists {
ts.workerAddrToTask[taskRequest.WorkerAddr] = make(map[string]bool)
}

Expand Down Expand Up @@ -398,7 +397,7 @@ func (ts *TaskScheduler) enqueueJob(enqueueCount int, jobId string) error {
targetMonitor, mExists := ts.MonitorClientPool[targetWorkerId]
ts.workerLock.Unlock()

if ! mExists {
if !mExists {
continue
}

Expand All @@ -411,12 +410,12 @@ func (ts *TaskScheduler) enqueueJob(enqueueCount int, jobId string) error {

ts.workerLock.Lock()
_, exists := ts.workerAddrToJobReservations[targetMonitor.Addr]
if ! exists {
if !exists {
ts.workerAddrToJobReservations[targetMonitor.Addr] = make(map[string]int)
}

// targetMonitor.Addr has one more jobId in it's queue
ts.workerAddrToJobReservations[targetMonitor.Addr][jobId] ++
ts.workerAddrToJobReservations[targetMonitor.Addr][jobId]++
ts.workerLock.Unlock()

fmt.Printf("[TaskScheduler %s: enqueueJob]: Enqueuing reservation on monitor %s for job reservation %s\n",
Expand Down
4 changes: 2 additions & 2 deletions src/phoenix/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ type Task struct {
}

type TaskRequest struct {
JobId string
WorkerAddr string
JobId string
WorkerAddr string
}

type WorkerTaskCompleteMsg struct {
Expand Down
22 changes: 9 additions & 13 deletions src/phoenix/worker-god/workergod.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@ import (
"time"
)


type WorkerWrapper struct {

// map where key is worker index and value is processId for that worker
RunningMonitors map[int] *exec.Cmd
RunningExecutors map[int] *exec.Cmd
RunningMonitors map[int]*exec.Cmd
RunningExecutors map[int]*exec.Cmd

// phoenix configuration
Config *config.PhoenixConfig
Config *config.PhoenixConfig
}

func NewWorkerGod(config *config.PhoenixConfig) phoenix.WorkerGod {
return &WorkerWrapper{
RunningMonitors: make(map[int]*exec.Cmd),
RunningMonitors: make(map[int]*exec.Cmd),
RunningExecutors: make(map[int]*exec.Cmd),
Config: config,
Config: config,
}
}

Expand Down Expand Up @@ -70,7 +69,7 @@ func (ww *WorkerWrapper) Kill(workerId int, ret *bool) error {
return nil
}

func (ww *WorkerWrapper) Start(workerId int, ret* bool) error {
func (ww *WorkerWrapper) Start(workerId int, ret *bool) error {

fmt.Println("[WorkerWrapper: Start] workerId:", workerId)
fmt.Println("[WorkerWrapper: Start] Start Timestamp: ", time.Now().UnixNano())
Expand Down Expand Up @@ -99,7 +98,6 @@ func (ww *WorkerWrapper) Start(workerId int, ret* bool) error {
mtor := exec.Command("init-monitor", "-workerId", strconv.Itoa(workerId))
etor := exec.Command("init-executor", "-workerId", strconv.Itoa(workerId))


ww.RunningMonitors[workerId] = mtor
ww.RunningExecutors[workerId] = etor

Expand All @@ -122,9 +120,9 @@ func (ww *WorkerWrapper) Start(workerId int, ret* bool) error {
}

// save output to logs
go writeToLog(mStdout, "logs/monitor_"+strconv.Itoa(workerId)+"_" +
go writeToLog(mStdout, "logs/monitor_"+strconv.Itoa(workerId)+"_"+
strconv.FormatInt(time.Now().Unix(), 36)+".log")
go writeToLog(eStdout, "logs/executor_"+strconv.Itoa(workerId)+"_" +
go writeToLog(eStdout, "logs/executor_"+strconv.Itoa(workerId)+"_"+
strconv.FormatInt(time.Now().Unix(), 36)+".log")

*ret = true
Expand All @@ -135,11 +133,9 @@ func writeToLog(out io.ReadCloser, logFileName string) {
scanner := bufio.NewScanner(out)
scanner.Split(bufio.ScanLines)

logFile, _ := os.OpenFile(logFileName, os.O_CREATE | os.O_RDWR, 0777)
logFile, _ := os.OpenFile(logFileName, os.O_CREATE|os.O_RDWR, 0777)
for scanner.Scan() {
m := scanner.Text()
logFile.WriteString(m + "\n")
}
}


1 change: 1 addition & 0 deletions src/phoenix/zk.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ const (
)

var ZkLocalServers = []string{"localhost:2181"}

// var ZkLocalServers = []string{"172.31.28.99:2181", "172.31.31.12:2181", "172.31.22.104:2181"}