Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

controller.go 14 KB
Newer Older
1 2 3 4 5 6
package preheat

import (
	"context"
	"time"

Wenkai Yin's avatar
Wenkai Yin committed
7
	"github.com/goharbor/harbor/src/jobservice/job"
8
	"github.com/goharbor/harbor/src/lib/errors"
9 10
	"github.com/goharbor/harbor/src/lib/q"
	"github.com/goharbor/harbor/src/pkg/p2p/preheat/instance"
11
	policyModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/policy"
12
	providerModels "github.com/goharbor/harbor/src/pkg/p2p/preheat/models/provider"
13
	"github.com/goharbor/harbor/src/pkg/p2p/preheat/policy"
14
	"github.com/goharbor/harbor/src/pkg/p2p/preheat/provider"
fanjiankong's avatar
fanjiankong committed
15
	"github.com/goharbor/harbor/src/pkg/scheduler"
fanjiankong's avatar
Fix.  
fanjiankong committed
16
	"github.com/goharbor/harbor/src/pkg/task"
fanjiankong's avatar
fanjiankong committed
17 18 19 20 21
)

const (
	// SchedulerCallback ...
	SchedulerCallback = "P2PPreheatCallback"
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
)

var (
	// Ctl is a global preheat controller instance
	Ctl = NewController()
)

// ErrorConflict for handling conflicts
var ErrorConflict = errors.New("resource conflict")

// ErrorUnhealthy for unhealthy
var ErrorUnhealthy = errors.New("instance unhealthy")

// Controller defines related top interfaces to handle the workflow of
// the image distribution.
type Controller interface {
	// Get all the supported distribution providers
	//
	// If succeed, an metadata of provider list will be returned.
	// Otherwise, a non nil error will be returned
	//
	GetAvailableProviders() ([]*provider.Metadata, error)

	// CountInstance all the setup instances of distribution providers
	//
	// params *q.Query : parameters for querying
	//
	// If succeed, matched provider instance count will be returned.
	// Otherwise, a non nil error will be returned
	//
	CountInstance(ctx context.Context, query *q.Query) (int64, error)

	// ListInstance all the setup instances of distribution providers
	//
	// params *q.Query : parameters for querying
	//
	// If succeed, matched provider instance list will be returned.
	// Otherwise, a non nil error will be returned
	//
	ListInstance(ctx context.Context, query *q.Query) ([]*providerModels.Instance, error)

	// GetInstance the metadata of the specified instance
	//
	// id string : ID of the instance being deleted
	//
	// If succeed, the metadata with nil error are returned
	// Otherwise, a non nil error is returned
	//
	GetInstance(ctx context.Context, id int64) (*providerModels.Instance, error)

fanjiankong's avatar
fanjiankong committed
72 73 74
	// GetInstance the metadata of the specified instance
	GetInstanceByName(ctx context.Context, name string) (*providerModels.Instance, error)

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
	// Create a new instance for the specified provider.
	//
	// If succeed, the ID of the instance will be returned.
	// Any problems met, a non nil error will be returned.
	//
	CreateInstance(ctx context.Context, instance *providerModels.Instance) (int64, error)

	// Delete the specified provider instance.
	//
	// id string : ID of the instance being deleted
	//
	// Any problems met, a non nil error will be returned.
	//
	DeleteInstance(ctx context.Context, id int64) error

	// Update the instance with incremental way;
	// Including update the enabled flag of the instance.
	//
	// id string                     : ID of the instance being updated
	// properties ...string 				 : The properties being updated
	//
	// Any problems met, a non nil error will be returned
	//
	UpdateInstance(ctx context.Context, instance *providerModels.Instance, properties ...string) error
99 100 101 102 103 104 105 106 107

	// do not provide another policy controller, mixed in preheat controller

	// CountPolicy returns the total count of the policy.
	CountPolicy(ctx context.Context, query *q.Query) (int64, error)
	// CreatePolicy creates the policy.
	CreatePolicy(ctx context.Context, schema *policyModels.Schema) (int64, error)
	// GetPolicy gets the policy by id.
	GetPolicy(ctx context.Context, id int64) (*policyModels.Schema, error)
108 109
	// GetPolicyByName gets the policy by name.
	GetPolicyByName(ctx context.Context, projectID int64, name string) (*policyModels.Schema, error)
110 111 112 113 114 115 116 117
	// UpdatePolicy updates the policy.
	UpdatePolicy(ctx context.Context, schema *policyModels.Schema, props ...string) error
	// DeletePolicy deletes the policy by id.
	DeletePolicy(ctx context.Context, id int64) error
	// ListPolicies lists policies by query.
	ListPolicies(ctx context.Context, query *q.Query) ([]*policyModels.Schema, error)
	// ListPoliciesByProject lists policies by project.
	ListPoliciesByProject(ctx context.Context, project int64, query *q.Query) ([]*policyModels.Schema, error)
118 119
	// CheckHealth checks the instance health, for test connection
	CheckHealth(ctx context.Context, instance *providerModels.Instance) error
120 121
	// DeletePoliciesOfProject delete all policies under one project
	DeletePoliciesOfProject(ctx context.Context, project int64) error
122 123 124 125 126 127 128 129 130
}

var _ Controller = (*controller)(nil)

// controller is the default implementation of Controller interface.
//
type controller struct {
	// For instance
	iManager instance.Manager
131
	// For policy
fanjiankong's avatar
Fix.  
fanjiankong committed
132 133 134
	pManager     policy.Manager
	scheduler    scheduler.Scheduler
	executionMgr task.ExecutionManager
135 136 137 138 139
}

// NewController is constructor of controller
func NewController() Controller {
	return &controller{
fanjiankong's avatar
Fix.  
fanjiankong committed
140 141 142 143
		iManager:     instance.Mgr,
		pManager:     policy.Mgr,
		scheduler:    scheduler.Sched,
		executionMgr: task.NewExecutionManager(),
144 145 146 147
	}
}

// GetAvailableProviders implements @Controller.GetAvailableProviders
148
func (c *controller) GetAvailableProviders() ([]*provider.Metadata, error) {
149 150 151 152
	return provider.ListProviders()
}

// CountInstance implements @Controller.CountInstance
153 154
func (c *controller) CountInstance(ctx context.Context, query *q.Query) (int64, error) {
	return c.iManager.Count(ctx, query)
155 156
}

157 158 159
// ListInstance implements @Controller.ListInstance
func (c *controller) ListInstance(ctx context.Context, query *q.Query) ([]*providerModels.Instance, error) {
	return c.iManager.List(ctx, query)
160 161 162
}

// CreateInstance implements @Controller.CreateInstance
163
func (c *controller) CreateInstance(ctx context.Context, instance *providerModels.Instance) (int64, error) {
164 165 166 167 168 169 170 171 172 173
	if instance == nil {
		return 0, errors.New("nil instance object provided")
	}

	// Avoid duplicated endpoint
	var query = &q.Query{
		Keywords: map[string]interface{}{
			"endpoint": instance.Endpoint,
		},
	}
174
	num, err := c.iManager.Count(ctx, query)
175 176 177 178 179 180 181
	if err != nil {
		return 0, err
	}
	if num > 0 {
		return 0, ErrorConflict
	}

182 183
	// !WARN: We don't check the health of the instance here.
	// That is ok because the health of instance will be checked before enforcing the policy each time.
184 185 186

	instance.SetupTimestamp = time.Now().Unix()

187
	return c.iManager.Save(ctx, instance)
188 189
}

190 191
// DeleteInstance implements @Controller.Delete
func (c *controller) DeleteInstance(ctx context.Context, id int64) error {
192 193 194 195
	ins, err := c.GetInstance(ctx, id)
	if err != nil {
		return err
	}
196 197 198 199 200 201 202 203 204 205 206 207 208
	// delete instance should check the instance whether be used by policies
	policies, err := c.ListPolicies(ctx, &q.Query{
		Keywords: map[string]interface{}{
			"provider_id": id,
		},
	})
	if err != nil {
		return err
	}

	if len(policies) > 0 {
		return errors.New(nil).
			WithCode(errors.PreconditionCode).
209
			WithMessage("Provider [%s] cannot be deleted as some preheat policies are using it", ins.Name)
210 211
	}

212
	return c.iManager.Delete(ctx, id)
213 214
}

215 216
// UpdateInstance implements @Controller.Update
func (c *controller) UpdateInstance(ctx context.Context, instance *providerModels.Instance, properties ...string) error {
217 218 219 220 221
	oldIns, err := c.GetInstance(ctx, instance.ID)
	if err != nil {
		return err
	}

222 223 224 225 226 227 228 229 230 231 232 233 234 235
	if !instance.Enabled {
		// update instance should check the instance whether be used by policies
		policies, err := c.ListPolicies(ctx, &q.Query{
			Keywords: map[string]interface{}{
				"provider_id": instance.ID,
			},
		})
		if err != nil {
			return err
		}

		if len(policies) > 0 {
			return errors.New(nil).
				WithCode(errors.PreconditionCode).
236
				WithMessage("Provider [%s] cannot be disabled as some preheat policies are using it", oldIns.Name)
237 238 239
		}
	}

240 241 242 243 244
	// vendor type does not support change
	if oldIns.Vendor != instance.Vendor {
		return errors.Errorf("provider [%s] vendor cannot be changed", oldIns.Name)
	}

245 246 247 248 249 250 251 252
	return c.iManager.Update(ctx, instance, properties...)
}

// GetInstance implements @Controller.Get
func (c *controller) GetInstance(ctx context.Context, id int64) (*providerModels.Instance, error) {
	return c.iManager.Get(ctx, id)
}

fanjiankong's avatar
fanjiankong committed
253 254 255 256
func (c *controller) GetInstanceByName(ctx context.Context, name string) (*providerModels.Instance, error) {
	return c.iManager.GetByName(ctx, name)
}

257 258 259 260 261
// CountPolicy returns the total count of the policy.
func (c *controller) CountPolicy(ctx context.Context, query *q.Query) (int64, error) {
	return c.pManager.Count(ctx, query)
}

fanjiankong's avatar
fanjiankong committed
262 263 264 265 266
// TriggerParam ...
type TriggerParam struct {
	PolicyID int64
}

267
// CreatePolicy creates the policy.
fanjiankong's avatar
fanjiankong committed
268
func (c *controller) CreatePolicy(ctx context.Context, schema *policyModels.Schema) (id int64, err error) {
Steven Zou's avatar
Steven Zou committed
269 270 271 272 273 274 275 276 277 278
	if schema == nil {
		return 0, errors.New("nil policy schema provided")
	}

	// Update timestamps
	now := time.Now()
	schema.CreatedAt = now
	schema.UpdatedTime = now

	// Get full model of policy schema
279
	err = schema.Decode()
Steven Zou's avatar
Steven Zou committed
280 281
	if err != nil {
		return 0, err
282
	}
fanjiankong's avatar
fanjiankong committed
283 284 285 286 287 288

	id, err = c.pManager.Create(ctx, schema)
	if err != nil {
		return
	}

Steven Zou's avatar
Steven Zou committed
289 290
	schema.ID = id

fanjiankong's avatar
fanjiankong committed
291 292 293 294
	if schema.Trigger != nil &&
		schema.Trigger.Type == policyModels.TriggerTypeScheduled &&
		len(schema.Trigger.Settings.Cron) > 0 {
		// schedule and update policy
295
		if _, err = c.scheduler.Schedule(ctx, job.P2PPreheat, id, "", schema.Trigger.Settings.Cron,
Wenkai Yin's avatar
Wenkai Yin committed
296
			SchedulerCallback, TriggerParam{PolicyID: id}); err != nil {
fanjiankong's avatar
fanjiankong committed
297 298 299
			return 0, err
		}

300
		if err = schema.Encode(); err == nil {
Steven Zou's avatar
Steven Zou committed
301 302 303
			err = c.pManager.Update(ctx, schema, "trigger")
		}

fanjiankong's avatar
fanjiankong committed
304
		if err != nil {
Wenkai Yin's avatar
Wenkai Yin committed
305
			if e := c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id); e != nil {
Steven Zou's avatar
Steven Zou committed
306
				return 0, errors.Wrap(e, err.Error())
fanjiankong's avatar
fanjiankong committed
307
			}
Steven Zou's avatar
Steven Zou committed
308

fanjiankong's avatar
fanjiankong committed
309 310 311 312 313
			return 0, err
		}
	}

	return
314 315 316 317 318 319 320
}

// GetPolicy gets the policy by id.
func (c *controller) GetPolicy(ctx context.Context, id int64) (*policyModels.Schema, error) {
	return c.pManager.Get(ctx, id)
}

321 322 323 324 325
// GetPolicyByName gets the policy by name.
func (c *controller) GetPolicyByName(ctx context.Context, projectID int64, name string) (*policyModels.Schema, error) {
	return c.pManager.GetByName(ctx, projectID, name)
}

326 327
// UpdatePolicy updates the policy.
func (c *controller) UpdatePolicy(ctx context.Context, schema *policyModels.Schema, props ...string) error {
Steven Zou's avatar
Steven Zou committed
328 329
	if schema == nil {
		return errors.New("nil policy schema provided")
330
	}
Steven Zou's avatar
Steven Zou committed
331 332

	// Get policy cache
fanjiankong's avatar
fanjiankong committed
333 334 335 336
	s0, err := c.pManager.Get(ctx, schema.ID)
	if err != nil {
		return err
	}
Steven Zou's avatar
Steven Zou committed
337 338 339 340 341 342 343

	// Double check trigger
	if s0.Trigger == nil {
		return errors.Errorf("missing trigger settings in preheat policy %s", s0.Name)
	}

	// Get full model of updating policy
344
	err = schema.Decode()
Steven Zou's avatar
Steven Zou committed
345 346 347 348
	if err != nil {
		return err
	}

fanjiankong's avatar
fanjiankong committed
349
	var cron = schema.Trigger.Settings.Cron
Wenkai Yin's avatar
Wenkai Yin committed
350
	var oldCron = s0.Trigger.Settings.Cron
fanjiankong's avatar
fanjiankong committed
351 352 353 354
	var needUn bool
	var needSch bool

	if s0.Trigger.Type != schema.Trigger.Type {
Wenkai Yin's avatar
Wenkai Yin committed
355
		if s0.Trigger.Type == policyModels.TriggerTypeScheduled && len(oldCron) > 0 {
fanjiankong's avatar
fanjiankong committed
356 357 358 359 360 361 362
			needUn = true
		}
		if schema.Trigger.Type == policyModels.TriggerTypeScheduled && len(cron) > 0 {
			needSch = true
		}
	} else {
		// not change trigger type
fanjiankong's avatar
Fix.  
fanjiankong committed
363
		if schema.Trigger.Type == policyModels.TriggerTypeScheduled && oldCron != cron {
fanjiankong's avatar
fanjiankong committed
364
			// unschedule old
Wenkai Yin's avatar
Wenkai Yin committed
365
			if len(oldCron) > 0 {
fanjiankong's avatar
fanjiankong committed
366 367 368 369 370 371 372 373 374 375 376 377 378
				needUn = true
			}
			// schedule new
			if len(cron) > 0 {
				// valid cron
				needSch = true
			}
		}

	}

	// unschedule old
	if needUn {
Wenkai Yin's avatar
Wenkai Yin committed
379
		err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, schema.ID)
fanjiankong's avatar
fanjiankong committed
380 381 382 383 384 385 386
		if err != nil {
			return err
		}
	}

	// schedule new
	if needSch {
387
		if _, err := c.scheduler.Schedule(ctx, job.P2PPreheat, schema.ID, "", cron, SchedulerCallback,
Wenkai Yin's avatar
Wenkai Yin committed
388
			TriggerParam{PolicyID: schema.ID}); err != nil {
fanjiankong's avatar
fanjiankong committed
389 390 391 392
			return err
		}
	}

Steven Zou's avatar
Steven Zou committed
393 394 395
	// Update timestamp
	schema.UpdatedTime = time.Now()

fanjiankong's avatar
fanjiankong committed
396 397 398 399 400 401
	err = c.pManager.Update(ctx, schema, props...)
	if (err != nil) && (needSch || needUn) {
		return errors.Wrapf(err, "Update failed, but not rollback scheduler")
	}

	return err
402 403 404 405
}

// DeletePolicy deletes the policy by id.
func (c *controller) DeletePolicy(ctx context.Context, id int64) error {
fanjiankong's avatar
fanjiankong committed
406 407 408 409
	s, err := c.pManager.Get(ctx, id)
	if err != nil {
		return err
	}
Wenkai Yin's avatar
Wenkai Yin committed
410 411
	if s.Trigger != nil && s.Trigger.Type == policyModels.TriggerTypeScheduled && len(s.Trigger.Settings.Cron) > 0 {
		err = c.scheduler.UnScheduleByVendor(ctx, job.P2PPreheat, id)
fanjiankong's avatar
fanjiankong committed
412 413 414 415 416
		if err != nil {
			return err
		}
	}

fanjiankong's avatar
Fix.  
fanjiankong committed
417 418 419 420
	if err = c.deleteExecs(ctx, id); err != nil {
		return err
	}

421 422 423
	return c.pManager.Delete(ctx, id)
}

424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
// DeletePoliciesOfProject deletes all the policy under project.
func (c *controller) DeletePoliciesOfProject(ctx context.Context, project int64) error {
	policies, err := c.ListPoliciesByProject(ctx, project, nil)
	if err != nil {
		return err
	}

	for _, p := range policies {
		if err = c.DeletePolicy(ctx, p.ID); err != nil {
			return err
		}
	}
	return nil
}

fanjiankong's avatar
Fix.  
fanjiankong committed
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
// deleteExecs delete executions
func (c *controller) deleteExecs(ctx context.Context, vendorID int64) error {
	executions, err := c.executionMgr.List(ctx, &q.Query{
		Keywords: map[string]interface{}{
			"VendorType": job.P2PPreheat,
			"VendorID":   vendorID,
		},
	})

	if err != nil {
		return err
	}

	for _, execution := range executions {
		if err = c.executionMgr.Delete(ctx, execution.ID); err != nil {
			return err
		}
	}

	return nil
}

461 462 463
// ListPolicies lists policies by query.
func (c *controller) ListPolicies(ctx context.Context, query *q.Query) ([]*policyModels.Schema, error) {
	return c.pManager.ListPolicies(ctx, query)
464 465
}

466 467 468
// ListPoliciesByProject lists policies by project.
func (c *controller) ListPoliciesByProject(ctx context.Context, project int64, query *q.Query) ([]*policyModels.Schema, error) {
	return c.pManager.ListPoliciesByProject(ctx, project, query)
469
}
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499

// CheckHealth checks the instance health, for test connection
func (c *controller) CheckHealth(ctx context.Context, instance *providerModels.Instance) error {
	if instance == nil {
		return errors.New("instance can not be nil")
	}

	fac, ok := provider.GetProvider(instance.Vendor)
	if !ok {
		return errors.Errorf("no driver registered for provider %s", instance.Vendor)
	}

	// Construct driver
	driver, err := fac(instance)
	if err != nil {
		return err
	}

	// Check health
	h, err := driver.GetHealth()
	if err != nil {
		return err
	}

	if h.Status != provider.DriverStatusHealthy {
		return errors.Errorf("preheat provider instance %s-%s:%s is not healthy", instance.Vendor, instance.Name, instance.Endpoint)
	}

	return nil
}