Unverified Commit a62f0589 authored by Wenkai Yin(尹文开)'s avatar Wenkai Yin(尹文开) Committed by GitHub
Browse files

Merge pull request #13129 from ywk253100/200917_task_mgr

Tiny improvement for the task manager
parents 20d4e86e 1a4106a9
......@@ -34,7 +34,7 @@ type preheatSuite struct {
fakePolicyMgr *pmocks.FakeManager
fakeScheduler *smocks.Scheduler
mockInstanceServer *httptest.Server
fakeExecutionMgr *tmocks.FakeExecutionManager
fakeExecutionMgr *tmocks.ExecutionManager
}
func TestPreheatSuite(t *testing.T) {
......@@ -42,7 +42,7 @@ func TestPreheatSuite(t *testing.T) {
fakeInstanceMgr := &instance.FakeManager{}
fakePolicyMgr := &pmocks.FakeManager{}
fakeScheduler := &smocks.Scheduler{}
fakeExecutionMgr := &tmocks.FakeExecutionManager{}
fakeExecutionMgr := &tmocks.ExecutionManager{}
var c = &controller{
iManager: fakeInstanceMgr,
......
......@@ -78,7 +78,7 @@ func (suite *EnforcerTestSuite) SetupSuite() {
mock.AnythingOfType("*q.Query"),
).Return(fakePolicies, nil)
fakeExecManager := &task.FakeExecutionManager{}
fakeExecManager := &task.ExecutionManager{}
fakeExecManager.On("Create",
context.TODO(),
mock.AnythingOfType("string"),
......@@ -87,7 +87,7 @@ func (suite *EnforcerTestSuite) SetupSuite() {
mock.AnythingOfType("map[string]interface {}"),
).Return(time.Now().Unix(), nil)
fakeTaskManager := &task.FakeManager{}
fakeTaskManager := &task.Manager{}
fakeTaskManager.On("Create",
context.TODO(),
mock.AnythingOfType("int64"),
......
......@@ -28,7 +28,7 @@ import (
type controllerTestSuite struct {
suite.Suite
ctl *controller
mgr *task.FakeManager
mgr *task.Manager
}
// TestControllerTestSuite tests controller.
......@@ -38,7 +38,7 @@ func TestControllerTestSuite(t *testing.T) {
// SetupTest setups the testing env.
func (c *controllerTestSuite) SetupTest() {
c.mgr = &task.FakeManager{}
c.mgr = &task.Manager{}
c.ctl = &controller{mgr: c.mgr}
}
......
......@@ -28,7 +28,7 @@ import (
type executionControllerTestSuite struct {
suite.Suite
ctl *executionController
mgr *task.FakeExecutionManager
mgr *task.ExecutionManager
}
// TestExecutionControllerTestSuite tests controller.
......@@ -38,7 +38,7 @@ func TestExecutionControllerTestSuite(t *testing.T) {
// SetupTest setups the testing env.
func (ec *executionControllerTestSuite) SetupTest() {
ec.mgr = &task.FakeExecutionManager{}
ec.mgr = &task.ExecutionManager{}
ec.ctl = &executionController{
mgr: ec.mgr,
}
......
......@@ -201,27 +201,10 @@ func (s *scheduler) UnScheduleByID(ctx context.Context, id int64) error {
}
if len(executions) > 0 {
executionID := executions[0].ID
if err = s.execMgr.Stop(ctx, executionID); err != nil {
// stop the execution
if err = s.execMgr.StopAndWait(ctx, executionID, 10*time.Second); err != nil {
return err
}
final := false
// after the stop called, the execution cannot be stopped immediately, and the execution
// cannot be deleted if it's status isn't in final status, so use the for loop to make
// sure the execution be in final status before deleting it
for t := 100 * time.Microsecond; t < 5*time.Second; t = t * 2 {
exec, err := s.execMgr.Get(ctx, executionID)
if err != nil {
return err
}
if job.Status(exec.Status).Final() {
final = true
break
}
time.Sleep(t)
}
if !final {
return fmt.Errorf("failed to unschedule the schedule %d: the execution %d isn't in final status", id, executionID)
}
// delete execution
if err = s.execMgr.Delete(ctx, executionID); err != nil {
return err
......
......@@ -15,6 +15,7 @@
package scheduler
import (
"fmt"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/pkg/task"
"github.com/goharbor/harbor/src/testing/mock"
......@@ -27,8 +28,8 @@ type schedulerTestSuite struct {
suite.Suite
scheduler *scheduler
dao *mockDAO
execMgr *tasktesting.FakeExecutionManager
taskMgr *tasktesting.FakeManager
execMgr *tasktesting.ExecutionManager
taskMgr *tasktesting.Manager
}
func (s *schedulerTestSuite) SetupTest() {
......@@ -37,8 +38,8 @@ func (s *schedulerTestSuite) SetupTest() {
s.Require().Nil(err)
s.dao = &mockDAO{}
s.execMgr = &tasktesting.FakeExecutionManager{}
s.taskMgr = &tasktesting.FakeManager{}
s.execMgr = &tasktesting.ExecutionManager{}
s.taskMgr = &tasktesting.Manager{}
s.scheduler = &scheduler{
dao: s.dao,
......@@ -97,17 +98,13 @@ func (s *schedulerTestSuite) TestSchedule() {
}
func (s *schedulerTestSuite) TestUnScheduleByID() {
// the underlying task isn't stopped
// the execution isn't stopped
s.execMgr.On("List", mock.Anything, mock.Anything).Return([]*task.Execution{
{
ID: 1,
},
}, nil)
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
s.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error"))
err := s.scheduler.UnScheduleByID(nil, 1)
s.NotNil(err)
s.dao.AssertExpectations(s.T())
......@@ -122,11 +119,7 @@ func (s *schedulerTestSuite) TestUnScheduleByID() {
ID: 1,
},
}, nil)
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.StoppedStatus.String(),
}, nil)
s.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
err = s.scheduler.UnScheduleByID(nil, 1)
......@@ -146,11 +139,7 @@ func (s *schedulerTestSuite) TestUnScheduleByVendor() {
ID: 1,
},
}, nil)
s.execMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{
ID: 1,
Status: job.StoppedStatus.String(),
}, nil)
s.execMgr.On("StopAndWait", mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.dao.On("Delete", mock.Anything, mock.Anything).Return(nil)
s.execMgr.On("Delete", mock.Anything, mock.Anything).Return(nil)
err := s.scheduler.UnScheduleByVendor(nil, "vendor", 1)
......
......@@ -17,6 +17,8 @@ package task
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
......@@ -52,6 +54,9 @@ type ExecutionManager interface {
MarkError(ctx context.Context, id int64, message string) (err error)
// Stop all linked tasks of the specified execution
Stop(ctx context.Context, id int64) (err error)
// StopAndWait stops all linked tasks of the specified execution and waits until all tasks are stopped
// or get an error
StopAndWait(ctx context.Context, id int64, timeout time.Duration) (err error)
// Delete the specified execution and its tasks
Delete(ctx context.Context, id int64) (err error)
// Get the specified execution
......@@ -121,6 +126,13 @@ func (e *executionManager) MarkError(ctx context.Context, id int64, message stri
}
func (e *executionManager) Stop(ctx context.Context, id int64) error {
execution, err := e.executionDAO.Get(ctx, id)
if err != nil {
return err
}
// when an execution is in final status, if it contains task that is a periodic or retrying job it will
// run again in the near future, so we must operate the stop action
tasks, err := e.taskDAO.List(ctx, &q.Query{
Keywords: map[string]interface{}{
"ExecutionID": id,
......@@ -129,6 +141,15 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
if err != nil {
return err
}
// contains no task and the status isn't final, update the status to stop directly
if len(tasks) == 0 && !job.Status(execution.Status).Final() {
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.StoppedStatus.String(),
EndTime: time.Now(),
}, "Status", "EndTime")
}
for _, task := range tasks {
if err = e.taskMgr.Stop(ctx, task.ID); err != nil {
log.Errorf("failed to stop task %d: %v", task.ID, err)
......@@ -138,6 +159,53 @@ func (e *executionManager) Stop(ctx context.Context, id int64) error {
return nil
}
func (e *executionManager) StopAndWait(ctx context.Context, id int64, timeout time.Duration) error {
var (
overtime bool
errChan = make(chan error)
lock = sync.RWMutex{}
)
go func() {
// stop the execution
if err := e.Stop(ctx, id); err != nil {
errChan <- err
return
}
// check the status of the execution
interval := 100 * time.Millisecond
stop := false
for !stop {
execution, err := e.executionDAO.Get(ctx, id)
if err != nil {
errChan <- err
return
}
// if the status is final, return
if job.Status(execution.Status).Final() {
errChan <- nil
return
}
time.Sleep(interval)
if interval < 1*time.Second {
interval = interval * 2
}
lock.RLock()
stop = overtime
lock.RUnlock()
}
}()
select {
case <-time.After(timeout):
lock.Lock()
overtime = true
lock.Unlock()
return fmt.Errorf("stopping the execution %d timeout", id)
case err := <-errChan:
return err
}
}
func (e *executionManager) Delete(ctx context.Context, id int64) error {
tasks, err := e.taskDAO.List(ctx, &q.Query{
Keywords: map[string]interface{}{
......
......@@ -16,6 +16,7 @@ package task
import (
"testing"
"time"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/lib/errors"
......@@ -76,6 +77,26 @@ func (e *executionManagerTestSuite) TestMarkError() {
}
func (e *executionManagerTestSuite) TestStop() {
// the execution contains no tasks and the status isn't final
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return(nil, nil)
e.execDAO.On("Update", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
// reset the mocks
e.SetupTest()
// the execution contains tasks
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{
{
ID: 1,
......@@ -83,9 +104,51 @@ func (e *executionManagerTestSuite) TestStop() {
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.Stop(nil, 1)
err = e.execMgr.Stop(nil, 1)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
e.taskMgr.AssertExpectations(e.T())
}
func (e *executionManagerTestSuite) TestStopAndWait() {
// timeout
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.RunningStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{
{
ID: 1,
ExecutionID: 1,
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
err := e.execMgr.StopAndWait(nil, 1, 1*time.Second)
e.Require().NotNil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
e.taskMgr.AssertExpectations(e.T())
// reset mocks
e.SetupTest()
// pass
e.execDAO.On("Get", mock.Anything, mock.Anything).Return(&dao.Execution{
ID: 1,
Status: job.StoppedStatus.String(),
}, nil)
e.taskDAO.On("List", mock.Anything, mock.Anything).Return([]*dao.Task{
{
ID: 1,
ExecutionID: 1,
},
}, nil)
e.taskMgr.On("Stop", mock.Anything, mock.Anything).Return(nil)
err = e.execMgr.StopAndWait(nil, 1, 1*time.Second)
e.Require().Nil(err)
e.taskDAO.AssertExpectations(e.T())
e.execDAO.AssertExpectations(e.T())
e.taskMgr.AssertExpectations(e.T())
}
......
......@@ -25,5 +25,7 @@ package pkg
//go:generate mockery --case snake --dir ../../pkg/scan/rest/v1 --all --output ./scan/rest/v1 --outpkg v1
//go:generate mockery --case snake --dir ../../pkg/scan/scanner --all --output ./scan/scanner --outpkg scanner
//go:generate mockery --case snake --dir ../../pkg/scheduler --name Scheduler --output ./scheduler --outpkg scheduler
//go:generate mockery --case snake --dir ../../pkg/task --name Manager --output ./task --outpkg task
//go:generate mockery --case snake --dir ../../pkg/task --name ExecutionManager --output ./task --outpkg task
//go:generate mockery --case snake --dir ../../pkg/user --name Manager --output ./user --outpkg user
//go:generate mockery --case snake --dir ../../pkg/robot/dao --name RobotAccountDao --output ./robot/dao --outpkg dao
// Code generated by mockery v2.0.3. DO NOT EDIT.
// Code generated by mockery v2.1.0. DO NOT EDIT.
package task
......@@ -9,15 +9,17 @@ import (
mock "github.com/stretchr/testify/mock"
task "github.com/goharbor/harbor/src/pkg/task"
time "time"
)
// FakeExecutionManager is an autogenerated mock type for the ExecutionManager type
type FakeExecutionManager struct {
// ExecutionManager is an autogenerated mock type for the ExecutionManager type
type ExecutionManager struct {
mock.Mock
}
// Count provides a mock function with given fields: ctx, query
func (_m *FakeExecutionManager) Count(ctx context.Context, query *q.Query) (int64, error) {
func (_m *ExecutionManager) Count(ctx context.Context, query *q.Query) (int64, error) {
ret := _m.Called(ctx, query)
var r0 int64
......@@ -38,7 +40,7 @@ func (_m *FakeExecutionManager) Count(ctx context.Context, query *q.Query) (int6
}
// Create provides a mock function with given fields: ctx, vendorType, vendorID, trigger, extraAttrs
func (_m *FakeExecutionManager) Create(ctx context.Context, vendorType string, vendorID int64, trigger string, extraAttrs ...map[string]interface{}) (int64, error) {
func (_m *ExecutionManager) Create(ctx context.Context, vendorType string, vendorID int64, trigger string, extraAttrs ...map[string]interface{}) (int64, error) {
_va := make([]interface{}, len(extraAttrs))
for _i := range extraAttrs {
_va[_i] = extraAttrs[_i]
......@@ -66,7 +68,7 @@ func (_m *FakeExecutionManager) Create(ctx context.Context, vendorType string, v
}
// Delete provides a mock function with given fields: ctx, id
func (_m *FakeExecutionManager) Delete(ctx context.Context, id int64) error {
func (_m *ExecutionManager) Delete(ctx context.Context, id int64) error {
ret := _m.Called(ctx, id)
var r0 error
......@@ -80,7 +82,7 @@ func (_m *FakeExecutionManager) Delete(ctx context.Context, id int64) error {
}
// Get provides a mock function with given fields: ctx, id
func (_m *FakeExecutionManager) Get(ctx context.Context, id int64) (*task.Execution, error) {
func (_m *ExecutionManager) Get(ctx context.Context, id int64) (*task.Execution, error) {
ret := _m.Called(ctx, id)
var r0 *task.Execution
......@@ -103,7 +105,7 @@ func (_m *FakeExecutionManager) Get(ctx context.Context, id int64) (*task.Execut
}
// List provides a mock function with given fields: ctx, query
func (_m *FakeExecutionManager) List(ctx context.Context, query *q.Query) ([]*task.Execution, error) {
func (_m *ExecutionManager) List(ctx context.Context, query *q.Query) ([]*task.Execution, error) {
ret := _m.Called(ctx, query)
var r0 []*task.Execution
......@@ -126,7 +128,7 @@ func (_m *FakeExecutionManager) List(ctx context.Context, query *q.Query) ([]*ta
}
// MarkDone provides a mock function with given fields: ctx, id, message
func (_m *FakeExecutionManager) MarkDone(ctx context.Context, id int64, message string) error {
func (_m *ExecutionManager) MarkDone(ctx context.Context, id int64, message string) error {
ret := _m.Called(ctx, id, message)
var r0 error
......@@ -140,7 +142,7 @@ func (_m *FakeExecutionManager) MarkDone(ctx context.Context, id int64, message
}
// MarkError provides a mock function with given fields: ctx, id, message
func (_m *FakeExecutionManager) MarkError(ctx context.Context, id int64, message string) error {
func (_m *ExecutionManager) MarkError(ctx context.Context, id int64, message string) error {
ret := _m.Called(ctx, id, message)
var r0 error
......@@ -154,7 +156,7 @@ func (_m *FakeExecutionManager) MarkError(ctx context.Context, id int64, message
}
// Stop provides a mock function with given fields: ctx, id
func (_m *FakeExecutionManager) Stop(ctx context.Context, id int64) error {
func (_m *ExecutionManager) Stop(ctx context.Context, id int64) error {
ret := _m.Called(ctx, id)
var r0 error
......@@ -166,3 +168,17 @@ func (_m *FakeExecutionManager) Stop(ctx context.Context, id int64) error {
return r0
}
// StopAndWait provides a mock function with given fields: ctx, id, timeout
func (_m *ExecutionManager) StopAndWait(ctx context.Context, id int64, timeout time.Duration) error {
ret := _m.Called(ctx, id, timeout)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, int64, time.Duration) error); ok {
r0 = rf(ctx, id, timeout)
} else {
r0 = ret.Error(0)
}
return r0
}
// Code generated by mockery v2.0.3. DO NOT EDIT.
// Code generated by mockery v2.1.0. DO NOT EDIT.
package task
......@@ -11,13 +11,13 @@ import (
task "github.com/goharbor/harbor/src/pkg/task"
)
// FakeManager is an autogenerated mock type for the Manager type
type FakeManager struct {
// Manager is an autogenerated mock type for the Manager type
type Manager struct {
mock.Mock
}
// Count provides a mock function with given fields: ctx, query
func (_m *FakeManager) Count(ctx context.Context, query *q.Query) (int64, error) {
func (_m *Manager) Count(ctx context.Context, query *q.Query) (int64, error) {
ret := _m.Called(ctx, query)
var r0 int64
......@@ -38,7 +38,7 @@ func (_m *FakeManager) Count(ctx context.Context, query *q.Query) (int64, error)
}
// Create provides a mock function with given fields: ctx, executionID, job, extraAttrs
func (_m *FakeManager) Create(ctx context.Context, executionID int64, job *task.Job, extraAttrs ...map[string]interface{}) (int64, error) {
func (_m *Manager) Create(ctx context.Context, executionID int64, job *task.Job, extraAttrs ...map[string]interface{}) (int64, error) {
_va := make([]interface{}, len(extraAttrs))
for _i := range extraAttrs {
_va[_i] = extraAttrs[_i]
......@@ -66,7 +66,7 @@ func (_m *FakeManager) Create(ctx context.Context, executionID int64, job *task.
}
// Get provides a mock function with given fields: ctx, id
func (_m *FakeManager) Get(ctx context.Context, id int64) (*task.Task, error) {
func (_m *Manager) Get(ctx context.Context, id int64) (*task.Task, error) {
ret := _m.Called(ctx, id)
var r0 *task.Task
......@@ -89,7 +89,7 @@ func (_m *FakeManager) Get(ctx context.Context, id int64) (*task.Task, error) {
}
// GetLog provides a mock function with given fields: ctx, id
func (_m *FakeManager) GetLog(ctx context.Context, id int64) ([]byte, error) {
func (_m *Manager) GetLog(ctx context.Context, id int64) ([]byte, error) {
ret := _m.Called(ctx, id)
var r0 []byte
......@@ -112,7 +112,7 @@ func (_m *FakeManager) GetLog(ctx context.Context, id int64) ([]byte, error) {
}
// List provides a mock function with given fields: ctx, query
func (_m *FakeManager) List(ctx context.Context, query *q.Query) ([]*task.Task, error) {
func (_m *Manager) List(ctx context.Context, query *q.Query) ([]*task.Task, error) {
ret := _m.Called(ctx, query)
var r0 []*task.Task
......@@ -135,7 +135,7 @@ func (_m *FakeManager) List(ctx context.Context, query *q.Query) ([]*task.Task,
}
// Stop provides a mock function with given fields: ctx, id
func (_m *FakeManager) Stop(ctx context.Context, id int64) error {
func (_m *Manager) Stop(ctx context.Context, id int64) error {
ret := _m.Called(ctx, id)
var r0 error
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment