Skip to content
This repository was archived by the owner on Nov 8, 2022. It is now read-only.

Commit 0b2c031

Browse files
Merge pull request #1544 from IzabellaRaulin/task_starts_immediately_rebased
Schedule workflow execution immediately
2 parents 58fb94c + 9677fd5 commit 0b2c031

File tree

9 files changed

+238
-49
lines changed

9 files changed

+238
-49
lines changed

control/strategy/lru.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/intelsdi-x/snap/core"
2727
)
2828

29-
// lru provides a stragey that selects the least recently used available plugin.
29+
// lru provides a strategy that selects the least recently used available plugin.
3030
type lru struct {
3131
*cache
3232
logger *log.Entry

control/strategy/sticky.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ var (
3333
ErrCacheDoesNotExist = errors.New("cache does not exist")
3434
)
3535

36-
// sticky provides a stragey that ... concurrency count is 1
36+
// sticky provides a strategy that ... concurrency count is 1
3737
type sticky struct {
3838
plugins map[string]AvailablePlugin
3939
metricCache map[string]*cache

mgmt/rest/client/client_func_test.go

+34-11
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,25 @@ func TestSnapClient(t *testing.T) {
343343
})
344344
})
345345

346+
Convey("unload one of collector plugin", func() {
347+
p := c.GetPlugins(false)
348+
So(p.Err, ShouldBeNil)
349+
So(len(p.LoadedPlugins), ShouldEqual, 3)
350+
351+
p2 := c.UnloadPlugin("collector", "mock", 2)
352+
So(p2.Err, ShouldBeNil)
353+
So(p2.Name, ShouldEqual, "mock")
354+
So(p2.Version, ShouldEqual, 2)
355+
So(p2.Type, ShouldEqual, "collector")
356+
357+
Convey("there should be two loaded plugins", func() {
358+
p = c.GetPlugins(false)
359+
So(p.Err, ShouldBeNil)
360+
So(len(p.LoadedPlugins), ShouldEqual, 2)
361+
So(p.AvailablePlugins, ShouldBeEmpty)
362+
})
363+
})
364+
346365
Convey("Tasks", func() {
347366
Convey("Passing a bad task manifest", func() {
348367
wfb := getWMFromSample("bad.json")
@@ -374,6 +393,7 @@ func TestSnapClient(t *testing.T) {
374393
correctSchedule := &Schedule{Type: "simple", Interval: "1s"}
375394
tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0)
376395
So(tt.Err, ShouldBeNil)
396+
So(tt.State, ShouldEqual, "Running")
377397
})
378398

379399
Convey("Creating a task with correct configuration for windowed schedule", func() {
@@ -384,17 +404,19 @@ func TestSnapClient(t *testing.T) {
384404
StopTimestamp: &stopTime}
385405
tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0)
386406
So(tt.Err, ShouldBeNil)
407+
So(tt.State, ShouldEqual, "Running")
387408
})
388409

389410
Convey("Creating a task with correct configuration for cron schedule", func() {
390411
correctSchedule := &Schedule{Type: "cron", Interval: "1 1 1 1 1 1"}
391412
tt := c.CreateTask(correctSchedule, wf, "baron", "", true, 0)
392413
So(tt.Err, ShouldBeNil)
414+
So(tt.State, ShouldEqual, "Running")
393415
})
394416
})
395417

396-
tf := c.CreateTask(sch, wf, "baron", "", false, 0)
397418
Convey("valid task not started on creation", func() {
419+
tf := c.CreateTask(sch, wf, "baron", "", false, 0)
398420
So(tf.Err, ShouldBeNil)
399421
So(tf.Name, ShouldEqual, "baron")
400422
So(tf.State, ShouldEqual, "Stopped")
@@ -436,8 +458,8 @@ func TestSnapClient(t *testing.T) {
436458
})
437459
})
438460

439-
tt := c.CreateTask(sch, wf, "baron", "", true, 0)
440461
Convey("valid task started on creation", func() {
462+
tt := c.CreateTask(sch, wf, "baron", "", true, 0)
441463
So(tt.Err, ShouldBeNil)
442464
So(tt.Name, ShouldEqual, "baron")
443465
So(tt.State, ShouldEqual, "Running")
@@ -533,6 +555,7 @@ func TestSnapClient(t *testing.T) {
533555

534556
a := new(ea)
535557
r := c.WatchTask(tf.ID)
558+
So(r.Err, ShouldBeNil)
536559
wait := make(chan struct{})
537560
go func() {
538561
for {
@@ -554,10 +577,11 @@ func TestSnapClient(t *testing.T) {
554577
So(startResp.Err, ShouldBeNil)
555578
<-wait
556579
a.Lock()
580+
defer a.Unlock()
581+
557582
So(len(a.events), ShouldEqual, 5)
558-
a.Unlock()
559583
So(a.events[0], ShouldEqual, "task-started")
560-
for x := 2; x <= 4; x++ {
584+
for x := 1; x < 5; x++ {
561585
So(a.events[x], ShouldEqual, "metric-event")
562586
}
563587
})
@@ -570,16 +594,15 @@ func TestSnapClient(t *testing.T) {
570594
So(p.Err, ShouldNotBeNil)
571595
So(p.Err.Error(), ShouldEqual, "plugin not found")
572596
})
597+
Convey("unload already unloaded plugin", func() {
598+
p := c.UnloadPlugin("collector", "mock", 2)
599+
So(p.Err, ShouldNotBeNil)
600+
So(p.Err.Error(), ShouldEqual, "plugin not found")
601+
})
573602
Convey("unload one of multiple", func() {
574603
p1 := c.GetPlugins(false)
575604
So(p1.Err, ShouldBeNil)
576-
So(len(p1.LoadedPlugins), ShouldEqual, 3)
577-
578-
p2 := c.UnloadPlugin("collector", "mock", 2)
579-
So(p2.Err, ShouldBeNil)
580-
So(p2.Name, ShouldEqual, "mock")
581-
So(p2.Version, ShouldEqual, 2)
582-
So(p2.Type, ShouldEqual, "collector")
605+
So(len(p1.LoadedPlugins), ShouldEqual, 2)
583606

584607
p3 := c.UnloadPlugin("publisher", "mock-file", 3)
585608
So(p3.Err, ShouldBeNil)

pkg/schedule/schedule.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ type Response interface {
4949
}
5050

5151
func waitOnInterval(last time.Time, i time.Duration) (uint, time.Time) {
52+
// first run
5253
if (last == time.Time{}) {
53-
time.Sleep(i)
54+
// for the first run, do not wait on interval
55+
// and schedule workflow execution immediately
5456
return uint(0), time.Now()
5557
}
5658
// Get the difference in time.Duration since last in nanoseconds (int64)

pkg/schedule/simple_schedule_test.go

+71-3
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,88 @@ func TestSimpleSchedule(t *testing.T) {
2626
r := s.Wait(last)
2727
after := time.Since(before)
2828

29-
So(r.State(), ShouldEqual, Active)
30-
So(r.Missed(), ShouldResemble, uint(4))
3129
So(r.Error(), ShouldEqual, nil)
30+
So(r.State(), ShouldEqual, Active)
31+
So(r.Missed(), ShouldEqual, 4)
3232
// We are ok at this precision with being within 10% over or under (10ms)
3333
afterMS := after.Nanoseconds() / 1000 / 1000
3434
So(afterMS, ShouldBeGreaterThan, shouldWait-10)
3535
So(afterMS, ShouldBeLessThan, shouldWait+10)
3636
})
37-
3837
Convey("invalid schedule", func() {
3938
s := NewSimpleSchedule(0)
4039
err := s.Validate()
4140
So(err, ShouldResemble, ErrInvalidInterval)
4241
})
42+
})
43+
Convey("Simple schedule with no misses", t, func() {
44+
interval := time.Millisecond * 10
45+
s := NewSimpleSchedule(interval)
46+
47+
err := s.Validate()
48+
So(err, ShouldBeNil)
49+
50+
var r []Response
51+
last := *new(time.Time)
52+
53+
before := time.Now()
54+
for len(r) <= 10 {
55+
r1 := s.Wait(last)
56+
last = time.Now()
57+
r = append(r, r1)
58+
}
59+
60+
var missed uint
61+
for _, x := range r {
62+
missed += x.Missed()
63+
}
64+
So(missed, ShouldEqual, 0)
65+
66+
// the task should start immediately
67+
So(
68+
r[0].LastTime().Sub(before).Seconds(),
69+
ShouldBeBetweenOrEqual,
70+
0,
71+
(interval).Seconds(),
72+
)
73+
})
74+
Convey("Simple schedule with a few misses", t, func() {
75+
interval := time.Millisecond * 10
76+
s := NewSimpleSchedule(interval)
77+
78+
err := s.Validate()
79+
So(err, ShouldBeNil)
80+
81+
var r []Response
82+
last := *new(time.Time)
83+
84+
before := time.Now()
85+
for len(r) <= 10 {
86+
r1 := s.Wait(last)
87+
last = time.Now()
88+
r = append(r, r1)
89+
// make it miss some
90+
if len(r) == 3 || len(r) == 7 {
91+
time.Sleep(s.Interval)
92+
}
93+
if len(r) == 9 {
94+
// Miss two
95+
time.Sleep(s.Interval * 2)
96+
}
97+
}
98+
99+
var missed uint
100+
for _, x := range r {
101+
missed += x.Missed()
102+
}
103+
So(missed, ShouldEqual, 4)
43104

105+
// the task should fire immediately
106+
So(
107+
r[0].LastTime().Sub(before).Seconds(),
108+
ShouldBeBetweenOrEqual,
109+
0,
110+
(interval).Seconds(),
111+
)
44112
})
45113
}

pkg/schedule/windowed_schedule.go

+5-12
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,12 @@ func (w *WindowedSchedule) Wait(last time.Time) Response {
6666
}).Debug("Waiting for window to start")
6767
time.Sleep(wait)
6868
}
69-
if (last == time.Time{}) {
70-
logger.WithFields(log.Fields{
71-
"_block": "windowed-wait",
72-
}).Debug("Last was unset using start time")
73-
last = *w.StartTime
74-
}
7569
} else {
76-
if (last == time.Time{}) {
77-
logger.WithFields(log.Fields{
78-
"_block": "windowed-wait",
79-
}).Debug("Last was unset using start time")
80-
last = time.Now()
81-
}
70+
// This has no start like a simple schedule, so execution starts immediately
71+
logger.WithFields(log.Fields{
72+
"_block": "windowed-wait",
73+
"sleep-duration": 0,
74+
}).Debug("Window start time not defined, start execution immediately")
8275
}
8376

8477
// If within the window we wait our interval and return

0 commit comments

Comments
 (0)