Commit 3b8a2890 authored by danfengliu's avatar danfengliu
Browse files

Upgrade robot-framework and Selenium library



1. Upgrade robot-framework to 3.1 and Selenium library to 4.4.0.
2. Fix a registry issue for clear filter text input.
Signed-off-by: default avatardanfengliu <danfengl@vmware.com>
parent da662f52
......@@ -91,10 +91,6 @@ class Artifact(base.Base, object):
if (timeout_count == 0):
break
artifact = self.get_reference_info(project_name, repo_name, reference, **kwargs)
print "artifact", artifact
print "artifact[0]", artifact[0]
print "artifact[0].scan_overview", artifact[0].scan_overview
print "artifact[0].scan_overview['application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0']", artifact[0].scan_overview['application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0']
scan_status = artifact[0].scan_overview['application/vnd.scanner.adapter.vuln.report.harbor+json; version=1.0']["scan_status"]
if scan_status == expected_scan_status:
return
......
# -*- coding: utf-8 -*-
import sys
import time
import subprocess
import client
import swagger_client
import v2_swagger_client
try:
from urllib import getproxies
except ImportError:
from urllib.request import getproxies
class Server:
def __init__(self, endpoint, verify_ssl):
self.endpoint = endpoint
self.verify_ssl = verify_ssl
class Credential:
def __init__(self, type, username, password):
self.type = type
self.username = username
self.password = password
def _create_client(server, credential, debug, api_type="products"):
cfg = None
if api_type in ('projectv2', 'artifact', 'repository', 'scan'):
cfg = v2_swagger_client.Configuration()
else:
cfg = swagger_client.Configuration()
cfg.host = server.endpoint
cfg.verify_ssl = server.verify_ssl
# support basic auth only for now
cfg.username = credential.username
cfg.password = credential.password
cfg.debug = debug
proxies = getproxies()
proxy = proxies.get('http', proxies.get('all', None))
if proxy:
cfg.proxy = proxy
return {
"chart": client.ChartRepositoryApi(client.ApiClient(cfg)),
"products": swagger_client.ProductsApi(swagger_client.ApiClient(cfg)),
"projectv2": v2_swagger_client.ProjectApi(v2_swagger_client.ApiClient(cfg)),
"artifact": v2_swagger_client.ArtifactApi(v2_swagger_client.ApiClient(cfg)),
"repository": v2_swagger_client.RepositoryApi(v2_swagger_client.ApiClient(cfg)),
"scan": v2_swagger_client.ScanApi(v2_swagger_client.ApiClient(cfg)),
"scanner": swagger_client.ScannersApi(swagger_client.ApiClient(cfg)),
}.get(api_type,'Error: Wrong API type')
def _assert_status_code(expect_code, return_code):
if str(return_code) != str(expect_code):
raise Exception(r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}.".format(expect_code, return_code))
def _assert_status_body(expect_status_body, returned_status_body):
if expect_status_body.strip() != returned_status_body.strip():
raise Exception(r"HTTPS status body s not as we expected. Expected {}, while actual HTTPS status body is {}.".format(expect_status_body, returned_status_body))
def _random_name(prefix):
return "%s-%d" % (prefix, int(round(time.time() * 1000)))
def _get_id_from_header(header):
try:
location = header["Location"]
return int(location.split("/")[-1])
except Exception:
return None
def _get_string_from_unicode(udata):
result=''
for u in udata:
tmp = u.encode('utf8')
result = result + tmp.strip('\n\r\t')
return result
def run_command(command):
print "Command: ", subprocess.list2cmdline(command)
try:
output = subprocess.check_output(command,
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as e:
raise Exception('Error: Exited with error code: %s. Output:%s'% (e.returncode, e.output))
return output
class Base:
def __init__(self,
server = Server(endpoint="http://localhost:8080/api", verify_ssl=False),
credential = Credential(type="basic_auth", username="admin", password="Harbor12345"),
debug = True, api_type = "products"):
if not isinstance(server.verify_ssl, bool):
server.verify_ssl = server.verify_ssl == "True"
self.server = server
self.credential = credential
self.debug = debug
self.api_type = api_type
self.client = _create_client(server, credential, debug, api_type=api_type)
def _get_client(self, **kwargs):
if len(kwargs) == 0:
return self.client
server = self.server
if "api_type" in kwargs:
server.api_type = kwargs.get("api_type")
if "endpoint" in kwargs:
server.endpoint = kwargs.get("endpoint")
if "verify_ssl" in kwargs:
if not isinstance(kwargs.get("verify_ssl"), bool):
server.verify_ssl = kwargs.get("verify_ssl") == "True"
else:
server.verify_ssl = kwargs.get("verify_ssl")
credential = self.credential
if "type" in kwargs:
credential.type = kwargs.get("type")
if "username" in kwargs:
credential.username = kwargs.get("username")
if "password" in kwargs:
credential.password = kwargs.get("password")
# -*- coding: utf-8 -*-
import sys
import time
import subprocess
import client
import swagger_client
import v2_swagger_client
try:
from urllib import getproxies
except ImportError:
from urllib.request import getproxies
class Server:
def __init__(self, endpoint, verify_ssl):
self.endpoint = endpoint
self.verify_ssl = verify_ssl
class Credential:
def __init__(self, type, username, password):
self.type = type
self.username = username
self.password = password
def _create_client(server, credential, debug, api_type="products"):
cfg = None
if api_type in ('projectv2', 'artifact', 'repository', 'scan'):
cfg = v2_swagger_client.Configuration()
else:
cfg = swagger_client.Configuration()
cfg.host = server.endpoint
cfg.verify_ssl = server.verify_ssl
# support basic auth only for now
cfg.username = credential.username
cfg.password = credential.password
cfg.debug = debug
proxies = getproxies()
proxy = proxies.get('http', proxies.get('all', None))
if proxy:
cfg.proxy = proxy
return {
"chart": client.ChartRepositoryApi(client.ApiClient(cfg)),
"products": swagger_client.ProductsApi(swagger_client.ApiClient(cfg)),
"projectv2": v2_swagger_client.ProjectApi(v2_swagger_client.ApiClient(cfg)),
"artifact": v2_swagger_client.ArtifactApi(v2_swagger_client.ApiClient(cfg)),
"repository": v2_swagger_client.RepositoryApi(v2_swagger_client.ApiClient(cfg)),
"scan": v2_swagger_client.ScanApi(v2_swagger_client.ApiClient(cfg)),
"scanner": swagger_client.ScannersApi(swagger_client.ApiClient(cfg)),
}.get(api_type,'Error: Wrong API type')
def _assert_status_code(expect_code, return_code):
if str(return_code) != str(expect_code):
raise Exception(r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}.".format(expect_code, return_code))
def _assert_status_body(expect_status_body, returned_status_body):
if expect_status_body.strip() != returned_status_body.strip():
raise Exception(r"HTTPS status body s not as we expected. Expected {}, while actual HTTPS status body is {}.".format(expect_status_body, returned_status_body))
def _random_name(prefix):
return "%s-%d" % (prefix, int(round(time.time() * 1000)))
def _get_id_from_header(header):
try:
location = header["Location"]
return int(location.split("/")[-1])
except Exception:
return None
def _get_string_from_unicode(udata):
result=''
for u in udata:
tmp = u.encode('utf8')
result = result + tmp.strip('\n\r\t')
return result
def run_command(command):
print("Command: ", subprocess.list2cmdline(command))
try:
output = subprocess.check_output(command,
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as e:
raise Exception('Error: Exited with error code: %s. Output:%s'% (e.returncode, e.output))
return output
class Base:
def __init__(self,
server = Server(endpoint="http://localhost:8080/api", verify_ssl=False),
credential = Credential(type="basic_auth", username="admin", password="Harbor12345"),
debug = True, api_type = "products"):
if not isinstance(server.verify_ssl, bool):
server.verify_ssl = server.verify_ssl == "True"
self.server = server
self.credential = credential
self.debug = debug
self.api_type = api_type
self.client = _create_client(server, credential, debug, api_type=api_type)
def _get_client(self, **kwargs):
if len(kwargs) == 0:
return self.client
server = self.server
if "api_type" in kwargs:
server.api_type = kwargs.get("api_type")
if "endpoint" in kwargs:
server.endpoint = kwargs.get("endpoint")
if "verify_ssl" in kwargs:
if not isinstance(kwargs.get("verify_ssl"), bool):
server.verify_ssl = kwargs.get("verify_ssl") == "True"
else:
server.verify_ssl = kwargs.get("verify_ssl")
credential = self.credential
if "type" in kwargs:
credential.type = kwargs.get("type")
if "username" in kwargs:
credential.username = kwargs.get("username")
if "password" in kwargs:
credential.password = kwargs.get("password")
return _create_client(server, credential, self.debug, self.api_type)
\ No newline at end of file
......@@ -17,7 +17,6 @@ class Chart(base.Base, object):
def chart_should_exist(self, repository, chart_name, **kwargs):
charts_data = self.get_charts(repository, **kwargs)
print "charts_data:", charts_data
for chart in charts_data:
if chart.name == chart_name:
return True
......
......@@ -9,12 +9,9 @@ def load_bundle(service_image, invocation_image):
bundle_tmpl_file = "./tests/apitests/python/bundle_data/bundle.json.tmpl"
with open(bundle_tmpl_file,'r') as load_f:
load_dict = json.load(load_f)
print "load_dict:", load_dict
print "load_dict-invocationImages:", load_dict["invocationImages"][0]["contentDigest"]
load_dict["images"]["hello"]["image"] = service_image
load_dict["invocationImages"][0]["image"] = invocation_image
bundle_str = json.dumps(load_dict)
print "bundle_str:", bundle_str
with open(bundle_file,'w') as dump_f:
dump_f.write(bundle_str)
dump_f.close()
......@@ -26,16 +23,16 @@ def cnab_fixup_bundle(bundle_file, target, auto_update_bundle = True):
if auto_update_bundle == True:
command.append("--auto-update-bundle")
#fixed_bundle_file = bundle_file
print "Command: ", command
print("Command: ", command)
ret = base.run_command(command)
print "Command return: ", ret
print("Command return: ", ret)
return fixed_bundle_file
def cnab_push_bundle(bundle_file, target):
command = ["cnab-to-oci", "push", bundle_file, "--target", target, "--auto-update-bundle"]
print "Command: ", command
print("Command: ", command)
ret = base.run_command(command)
print "Command return: ", ret
print("Command return: ", ret)
for line in ret.split("\n"):
line = line.replace('\"', '')
if line.find('sha256') >= 0:
......@@ -48,5 +45,4 @@ def push_cnab_bundle(harbor_server, user, password, service_image, invocation_im
bundle_file = load_bundle(service_image, invocation_image)
fixed_bundle_file = cnab_fixup_bundle(bundle_file, target, auto_update_bundle = auto_update_bundle)
sha256 = cnab_push_bundle(fixed_bundle_file, target)
print "sha256:", sha256
return sha256
......@@ -6,15 +6,15 @@ import docker_api
def ctr_images_pull(username, password, oci):
command = ["sudo", "ctr", "images", "pull", "-u", username+":"+password, oci]
print "Command: ", command
print("Command: ", command)
ret = base.run_command(command)
print "Command return: ", ret
print("Command return: ", ret)
def ctr_images_list(oci_ref = None):
command = ["sudo", "ctr", "images", "list", "--q"]
print "Command: ", command
print("Command: ", command)
ret = base.run_command(command)
print "Command return: ", ret
print("Command return: ", ret)
if oci_ref is not None and oci_ref not in ret.split("\n"):
raise Exception(r" Get OCI ref failed, expected ref is [{}], but return ref list is [{}]".format (ret))
......
......@@ -13,29 +13,29 @@ except ImportError:
def docker_info_display():
command = ["docker", "info", "-f", "'{{.OSType}}/{{.Architecture}}'"]
print "Docker Info: ", command
print("Docker Info: ", command)
ret = base.run_command(command)
print "Command return: ", ret
print("Command return: ", ret)
def docker_login_cmd(harbor_host, user, password, enable_manifest = True):
command = ["sudo", "docker", "login", harbor_host, "-u", user, "-p", password]
print "Docker Login Command: ", command
print( "Docker Login Command: ", command)
base.run_command(command)
if enable_manifest == True:
try:
ret = subprocess.check_output(["./tests/apitests/python/update_docker_cfg.sh"], shell=False)
except subprocess.CalledProcessError, exc:
except subprocess.CalledProcessError as exc:
raise Exception("Failed to update docker config, error is {} {}.".format(exc.returncode, exc.output))
def docker_manifest_create(index, manifests):
command = ["sudo", "docker","manifest","create",index]
command.extend(manifests)
print "Docker Manifest Command: ", command
print( "Docker Manifest Command: ", command)
base.run_command(command)
def docker_manifest_push(index):
command = ["sudo", "docker","manifest","push",index]
print "Docker Manifest Command: ", command
print( "Docker Manifest Command: ", command)
ret = base.run_command(command)
index_sha256=""
manifest_list=[]
......@@ -58,7 +58,7 @@ def list_repositories(harbor_host, user, password, n = None, last = None):
command = ["curl", "-s", "-u", user+":"+password, "https://"+harbor_host+"/v2/_catalog"+"?n=%d"%n, "--insecure"]
else:
command = ["curl", "-s", "-u", user+":"+password, "https://"+harbor_host+"/v2/_catalog", "--insecure"]
print "List Repositories Command: ", command
print( "List Repositories Command: ", command)
ret = base.run_command(command)
repos = json.loads(ret).get("repositories","")
return repos
......@@ -70,7 +70,7 @@ def list_image_tags(harbor_host, repository, user, password, n = None, last = No
command = ["curl", "-s", "-u", user+":"+password, "https://"+harbor_host+"/v2/"+repository+"/tags/list"+"?n=%d"%n, "--insecure"]
else:
command = ["curl", "-s", "-u", user+":"+password, "https://"+harbor_host+"/v2/"+repository+"/tags/list", "--insecure"]
print "List Image Tags Command: ", command
print( "List Image Tags Command: ", command)
ret = base.run_command(command)
tags = json.loads(ret).get("tags","")
return tags
......@@ -85,9 +85,9 @@ class DockerAPI(object):
expected_error_message = None
try:
self.DCLIENT.login(registry = registry, username=username, password=password)
except docker.errors.APIError, err:
except docker.errors.APIError as err:
if expected_error_message is not None:
print "docker login error:", str(err)
print( "docker login error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Docker login: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
......@@ -105,10 +105,10 @@ class DockerAPI(object):
try:
ret = base._get_string_from_unicode(self.DCLIENT.pull(r'{}:{}'.format(image, _tag)))
return ret
except Exception, err:
except Exception as err:
caught_err = True
if expected_error_message is not None:
print "docker image pull error:", str(err)
print( "docker image pull error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Pull image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
......@@ -128,7 +128,7 @@ class DockerAPI(object):
try:
self.DCLIENT.tag(image, harbor_registry, _tag, force=True)
return harbor_registry, _tag
except docker.errors.APIError, e:
except docker.errors.APIError as e:
raise Exception(r" Docker tag image {} failed, error is [{}]".format (image, e.message))
def docker_image_push(self, harbor_registry, tag, expected_error_message = None):
......@@ -139,10 +139,10 @@ class DockerAPI(object):
try:
ret = base._get_string_from_unicode(self.DCLIENT.push(harbor_registry, tag, stream=True))
return ret
except Exception, err:
except Exception as err:
caught_err = True
if expected_error_message is not None:
print "docker image push error:", str(err)
print( "docker image push error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
......@@ -184,10 +184,10 @@ class DockerAPI(object):
self.DCLIENT.pull(repo)
image = self.DCLIENT2.images.get(repo)
return repo, image.id
except Exception, err:
except Exception as err:
caught_err = True
if expected_error_message is not None:
print "docker image build error:", str(err)
print( "docker image build error:", str(err))
if str(err).lower().find(expected_error_message.lower()) < 0:
raise Exception(r"Push image: Return message {} is not as expected {}".format(str(err), expected_error_message))
else:
......
......@@ -6,26 +6,26 @@ import base
def get_chart_file(file_name):
command = ["wget", file_name]
ret = base.run_command(command)
print "Command Return: ", ret
print("Command return: ", ret)
command = ["tar", "xvf", file_name.split('/')[-1]]
ret = base.run_command(command)
print "Command Return: ", ret
print("Command return: ", ret)
def helm_login(harbor_server, user, password):
os.putenv("HELM_EXPERIMENTAL_OCI", "1")
command = ["helm3", "registry", "login", harbor_server, "-u", user, "-p", password]
print "Command: ", command
print("Command: ", command)
ret = base.run_command(command)
print "Command return: ", ret
print("Command return: ", ret)
def helm_save(chart_archive, harbor_server, project, repo_name):
command = ["helm3", "chart","save", chart_archive, harbor_server+"/"+project+"/"+repo_name]
print "Command: ", command
print("Command: ", command)
base.run_command(command)
def helm_push(harbor_server, project, repo_name, version):
command = ["helm3", "chart","push", harbor_server+"/"+project+"/"+repo_name+":"+version]
print "Command: ", command
print("Command: ", command)
ret = base.run_command(command)
return ret
......@@ -37,11 +37,11 @@ def helm_chart_push_to_harbor(chart_file, archive, harbor_server, project, repo_
def helm2_add_repo(helm_repo_name, harbor_url, project, username, password):
command = ["helm2", "repo", "add", "--username=" + username, "--password=" + password, helm_repo_name, harbor_url + "/chartrepo/" + project]
print "Command: ", command
print("Command: ", command)
base.run_command(command)
def helm2_push(helm_repo_name, chart_file, project, username, password):
get_chart_file(chart_file)
command = ["helm2", "push", "--username=" + username, "--password=" + password, chart_file.split('/')[-1], helm_repo_name]
print "Command: ", command
print("Command: ", command)
base.run_command(command)
\ No newline at end of file
......@@ -39,7 +39,6 @@ def oras_pull(harbor_server, user, password, project, repo, tag):
os.rmdir(cwd)
os.makedirs(cwd)
os.chdir(cwd)
print "Tmp dir:", cwd
except Exception as e:
raise Exception('Error: Exited with error {}',format(e))
ret = base.run_command([oras_cmd, "pull", harbor_server + "/" + project + "/" + repo+":"+ tag, "-a"])
......
......@@ -38,7 +38,7 @@ class Replication(base.Base):
if str(rule_data.name) != str(expect_rule_name):
raise Exception(r"Check replication rule failed, expect <{}> actual <{}>.".format(expect_rule_name, str(rule_data.name)))
else:
print r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name))
print(r"Check Replication rule passed, rule name <{}>.".format(str(rule_data.name)))
#get_trigger = str(rule_data.trigger.kind)
#if expect_trigger is not None and get_trigger == str(expect_trigger):
# print r"Check Replication rule trigger passed, trigger name <{}>.".format(get_trigger)
......
# -*- coding: utf-8 -*-
import site
reload(site)
import time
import base
import swagger_client
......@@ -121,7 +119,6 @@ class Repository(base.Base, object):
def check_repository_exist(self, project_Name, repo_name, **kwargs):
repositories = self.list_repositories(project_Name, **kwargs)
for repo in repositories:
print project_Name+"/"+repo_name
if repo.name == project_Name+"/"+repo_name:
return True
return False
......
......@@ -5,7 +5,7 @@ from testutils import notary_url
def sign_image(registry_ip, project_name, image, tag):
try:
ret = subprocess.check_output(["./tests/apitests/python/sign_image.sh", registry_ip, project_name, image, tag, notary_url], shell=False)
print "sign_image return: ", ret
print("sign_image return: ", ret)
except subprocess.CalledProcessError, exc:
raise Exception("Failed to sign image error is {} {}.".format(exc.returncode, exc.output))
......@@ -45,7 +45,6 @@ class User(base.Base):
client = self._get_client(**kwargs)
data, status_code, _ = client.users_user_id_get_with_http_info(user_id)
base._assert_status_code(200, status_code)
print "data in lib:", data
return data
......@@ -80,7 +79,6 @@ class User(base.Base):
def update_user_role_as_sysadmin(self, user_id, IsAdmin, **kwargs):
client = self._get_client(**kwargs)
sysadmin_flag = swagger_client.SysAdminFlag(IsAdmin)
print "sysadmin_flag:", sysadmin_flag
_, status_code, _ = client.users_user_id_sysadmin_put_with_http_info(user_id, sysadmin_flag)
base._assert_status_code(200, status_code)
return user_id
from __future__ import absolute_import
import unittest
import numpy
import threading
from datetime import *
from time import sleep, ctime
import library.repository
import library.docker_api
from library.base import _assert_status_code
from testutils import ADMIN_CLIENT
from testutils import harbor_server
from testutils import TEARDOWN
from library.project import Project
from library.repository import Repository
from library.artifact import Artifact
from library.repository import push_image_to_project
from library.repository import pull_harbor_image
from library.repository import push_special_image_to_project
import argparse
def do_populate(name_index, repo_count):
project= Project()
artifact = Artifact()
repo = Repository()
url = ADMIN_CLIENT["endpoint"]
ADMIN_CLIENT["password"] = "qA5ZgV"
#2. Create a new project(PA) by user(UA);
project_name = "project"+str(name_index)