job.go 7.01 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retention

import (
18
	"bytes"
Ziming Zhang's avatar
Ziming Zhang committed
19
	"encoding/json"
20 21 22
	"fmt"
	"strings"
	"time"
23

24 25
	"github.com/goharbor/harbor/src/lib/selector"

26
	"github.com/goharbor/harbor/src/jobservice/job"
Steven Zou's avatar
Steven Zou committed
27
	"github.com/goharbor/harbor/src/jobservice/logger"
28
	"github.com/goharbor/harbor/src/lib/errors"
29
	"github.com/goharbor/harbor/src/pkg/retention/dep"
Steven Zou's avatar
Steven Zou committed
30
	"github.com/goharbor/harbor/src/pkg/retention/policy"
31
	"github.com/goharbor/harbor/src/pkg/retention/policy/lwp"
32
	"github.com/olekukonko/tablewriter"
Steven Zou's avatar
Steven Zou committed
33 34
)

35
const (
36 37 38 39
	actionMarkRetain    = "RETAIN"
	actionMarkDeletion  = "DEL"
	actionMarkError     = "ERR"
	actionMarkImmutable = "IMMUTABLE"
40 41
)

42
// Job of running retention process
Steven Zou's avatar
Steven Zou committed
43
type Job struct{}
44 45 46 47 48 49

// MaxFails of the job
func (pj *Job) MaxFails() uint {
	return 3
}

50 51 52 53 54
// MaxCurrency is implementation of same method in Interface.
func (pj *Job) MaxCurrency() uint {
	return 0
}

55 56 57 58 59 60
// ShouldRetry indicates job can be retried if failed
func (pj *Job) ShouldRetry() bool {
	return true
}

// Validate the parameters
61 62 63 64 65
func (pj *Job) Validate(params job.Parameters) (err error) {
	if _, err = getParamRepo(params); err == nil {
		if _, err = getParamMeta(params); err == nil {
			_, err = getParamDryRun(params)
		}
Steven Zou's avatar
Steven Zou committed
66 67
	}

68
	return
69 70 71
}

// Run the job
72
func (pj *Job) Run(ctx job.Context, params job.Parameters) error {
Steven Zou's avatar
Steven Zou committed
73 74 75 76 77 78
	// logger for logging
	myLogger := ctx.GetLogger()

	// Parameters have been validated, ignore error checking
	repo, _ := getParamRepo(params)
	liteMeta, _ := getParamMeta(params)
Steven Zou's avatar
Steven Zou committed
79
	isDryRun, _ := getParamDryRun(params)
Steven Zou's avatar
Steven Zou committed
80

81 82
	// Log stage: start
	repoPath := fmt.Sprintf("%s/%s", repo.Namespace, repo.Name)
83
	myLogger.Infof("Run retention process.\n Repository: %s \n Rule Algorithm: %s \n Dry Run: %v", repoPath, liteMeta.Algorithm, isDryRun)
84

85 86 87 88 89 90
	// Stop check point 1:
	if isStopped(ctx) {
		logStop(myLogger)
		return nil
	}

Steven Zou's avatar
Steven Zou committed
91
	// Retrieve all the candidates under the specified repository
Steven Zou's avatar
Steven Zou committed
92
	allCandidates, err := dep.DefaultClient.GetCandidates(repo)
Steven Zou's avatar
Steven Zou committed
93 94 95 96
	if err != nil {
		return logError(myLogger, err)
	}

97 98 99
	// Log stage: load candidates
	myLogger.Infof("Load %d candidates from repository %s", len(allCandidates), repoPath)

Steven Zou's avatar
Steven Zou committed
100 101
	// Build the processor
	builder := policy.NewBuilder(allCandidates)
Steven Zou's avatar
Steven Zou committed
102
	processor, err := builder.Build(liteMeta, isDryRun)
Steven Zou's avatar
Steven Zou committed
103 104 105 106
	if err != nil {
		return logError(myLogger, err)
	}

107 108 109 110 111 112
	// Stop check point 2:
	if isStopped(ctx) {
		logStop(myLogger)
		return nil
	}

Steven Zou's avatar
Steven Zou committed
113 114 115 116 117 118
	// Run the flow
	results, err := processor.Process(allCandidates)
	if err != nil {
		return logError(myLogger, err)
	}

119 120 121
	// Log stage: results with table view
	logResults(myLogger, allCandidates, results)

Ziming Zhang's avatar
Ziming Zhang committed
122
	// Save retain and total num in DB
123
	return saveRetainNum(ctx, results, allCandidates, isDryRun)
Ziming Zhang's avatar
Ziming Zhang committed
124 125
}

126 127
func saveRetainNum(ctx job.Context, results []*selector.Result, allCandidates []*selector.Candidate, isDryRun bool) error {
	var realDelete []*selector.Result
128
	for _, r := range results {
Ziming Zhang's avatar
Ziming Zhang committed
129
		if r.Error == nil {
130
			realDelete = append(realDelete, r)
Ziming Zhang's avatar
Ziming Zhang committed
131 132 133
		}
	}
	retainObj := struct {
134 135 136 137
		Total    int                `json:"total"`
		Retained int                `json:"retained"`
		DryRun   bool               `json:"dry_run"`
		Deleted  []*selector.Result `json:"deleted"`
Ziming Zhang's avatar
Ziming Zhang committed
138 139
	}{
		Total:    len(allCandidates),
140 141 142
		Retained: len(allCandidates) - len(realDelete),
		DryRun:   isDryRun,
		Deleted:  realDelete,
Ziming Zhang's avatar
Ziming Zhang committed
143 144 145 146 147 148
	}
	c, err := json.Marshal(retainObj)
	if err != nil {
		return err
	}
	_ = ctx.Checkin(string(c))
149 150
	return nil
}
Steven Zou's avatar
Steven Zou committed
151

152
func logResults(logger logger.Interface, all []*selector.Candidate, results []*selector.Result) {
153 154 155 156 157 158 159
	hash := make(map[string]error, len(results))
	for _, r := range results {
		if r.Target != nil {
			hash[r.Target.Hash()] = r.Error
		}
	}

160
	op := func(c *selector.Candidate) string {
161
		if e, exists := hash[c.Hash()]; exists {
162
			if e != nil {
163
				if _, ok := e.(*selector.ImmutableError); ok {
164 165
					return actionMarkImmutable
				}
166
				return actionMarkError
167 168
			}

169
			return actionMarkDeletion
170 171
		}

172
		return actionMarkRetain
173 174 175 176
	}

	var buf bytes.Buffer

Steven Zou's avatar
Steven Zou committed
177
	data := make([][]string, len(all))
178 179 180

	for _, c := range all {
		row := []string{
Ziming Zhang's avatar
Ziming Zhang committed
181
			c.Digest,
182
			strings.Join(c.Tags, ","),
183 184 185 186 187 188 189 190 191 192 193
			c.Kind,
			strings.Join(c.Labels, ","),
			t(c.PushedTime),
			t(c.PulledTime),
			t(c.CreationTime),
			op(c),
		}
		data = append(data, row)
	}

	table := tablewriter.NewWriter(&buf)
Ziming Zhang's avatar
Ziming Zhang committed
194 195
	table.SetAutoFormatHeaders(false)
	table.SetHeader([]string{"Digest", "Tag", "Kind", "Labels", "PushedTime", "PulledTime", "CreatedTime", "Retention"})
196 197 198 199 200
	table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false})
	table.SetCenterSeparator("|")
	table.AppendBulk(data)
	table.Render()

201
	logger.Infof("\n%s", buf.String())
202 203 204 205 206 207 208 209 210

	// log all the concrete errors if have
	for _, r := range results {
		if r.Error != nil {
			logger.Infof("Retention error for artifact %s:%s : %s", r.Target.Kind, arn(r.Target), r.Error)
		}
	}
}

211
func arn(art *selector.Candidate) string {
212
	return fmt.Sprintf("%s/%s:%s", art.Namespace, art.Repository, art.Digest)
213 214 215
}

func t(tm int64) string {
216
	if tm <= 0 {
Ziming Zhang's avatar
Ziming Zhang committed
217 218
		return ""
	}
219 220 221
	return time.Unix(tm, 0).Format("2006/01/02 15:04:05")
}

222 223 224 225 226 227 228 229 230 231 232
func isStopped(ctx job.Context) (stopped bool) {
	cmd, ok := ctx.OPCommand()
	stopped = ok && cmd == job.StopCommand

	return
}

func logStop(logger logger.Interface) {
	logger.Info("Retention job is stopped")
}

Steven Zou's avatar
Steven Zou committed
233 234 235 236 237 238 239
func logError(logger logger.Interface, err error) error {
	wrappedErr := errors.Wrap(err, "retention job")
	logger.Error(wrappedErr)

	return wrappedErr
}

Steven Zou's avatar
Steven Zou committed
240 241 242 243 244 245 246 247 248 249 250 251 252 253
func getParamDryRun(params job.Parameters) (bool, error) {
	v, ok := params[ParamDryRun]
	if !ok {
		return false, errors.Errorf("missing parameter: %s", ParamDryRun)
	}

	dryRun, ok := v.(bool)
	if !ok {
		return false, errors.Errorf("invalid parameter: %s", ParamDryRun)
	}

	return dryRun, nil
}

254
func getParamRepo(params job.Parameters) (*selector.Repository, error) {
255
	v, ok := params[ParamRepo]
Steven Zou's avatar
Steven Zou committed
256
	if !ok {
257
		return nil, errors.Errorf("missing parameter: %s", ParamRepo)
Steven Zou's avatar
Steven Zou committed
258 259
	}

260
	repoJSON, ok := v.(string)
Steven Zou's avatar
Steven Zou committed
261
	if !ok {
262
		return nil, errors.Errorf("invalid parameter: %s", ParamRepo)
Steven Zou's avatar
Steven Zou committed
263 264
	}

265
	repo := &selector.Repository{}
266 267
	if err := repo.FromJSON(repoJSON); err != nil {
		return nil, errors.Wrap(err, "parse repository from JSON")
268 269
	}

Steven Zou's avatar
Steven Zou committed
270 271 272
	return repo, nil
}

273
func getParamMeta(params job.Parameters) (*lwp.Metadata, error) {
274
	v, ok := params[ParamMeta]
Steven Zou's avatar
Steven Zou committed
275
	if !ok {
276
		return nil, errors.Errorf("missing parameter: %s", ParamMeta)
Steven Zou's avatar
Steven Zou committed
277 278
	}

279
	metaJSON, ok := v.(string)
Steven Zou's avatar
Steven Zou committed
280
	if !ok {
281
		return nil, errors.Errorf("invalid parameter: %s", ParamMeta)
Steven Zou's avatar
Steven Zou committed
282 283
	}

284
	meta := &lwp.Metadata{}
285 286
	if err := meta.FromJSON(metaJSON); err != nil {
		return nil, errors.Wrap(err, "parse retention policy from JSON")
287 288
	}

Steven Zou's avatar
Steven Zou committed
289 290
	return meta, nil
}