主页 > 互联网  > 

python利用jenkins模块操作jenkins

python利用jenkins模块操作jenkins
安装python-jenkins

可以使用pip命令来安装python-jenkins模块:

pip install python-jenkins 操作jenkins

接下来就是连接和操作jenkins,写了个class,直接上代码

class Jenkins(): def __init__(self, url, username, password): # jenkins服务连接URL self.url = url # jenkins登陆的用户名密码 self.username = username self.password = password # 连接jenkins self.server = jenkins.Jenkins(self.url, self.username, self.password) def create_job(self, job_name, credentials_id, config_xml, command, node_name): """ 创建job任务 :param job_name: 任务名称 :param config_xml: job任务参数设置 :param command: job任务中shell脚本 :param node_name: 执行任务的几点名称 :return: """ # 这步就是将job任务中的shell脚本那部分添加到config_xml中,在config_xml中需要提前设置好{command}才可以这样做哈 config_xml = config_xml.replace("{command}", command).rstrip() config_xml = config_xml.decode('utf-8') try: self.server.create_job(job_name, config_xml) except Exception as e: err_one = "job[{}] already exists".format(job_name) if str(e) != err_one: return False, e return True, "" def get_job_xml(self, job_name): """ 获取job xml配置信息 :param job_name: :return: """ return self.server.get_job_config(job_name) def get_node_config(self, node_name): """ 获取节点配置信息 :param node_name: :return: """ return self.server.get_node_config(node_name) def create_node(self, node_name, credentials_id, sshhost, remotefs, labels=None, numExecutors=1, nodeDescription=None): """ 创建节点 :param node_name: 节点名称 :param credentials_id: 在Jenkins中配置的凭据ID :param sshhost: 主机 :param remoteFS: 远程工作目录 :param labels: 标签 :param numExecutors: Number of executors(Jenkins 可以在此节点上执行并发构建的最大数目) :param nodeDescription: 描述 :return: """ try: self.server.create_node( node_name, numExecutors=numExecutors, nodeDescription=nodeDescription, remoteFS=remotefs, labels=labels, launcher= "hudson.plugins.sshslaves.SSHLauncher", launcher_params={ 'stapler-class': 'hudson.slaves.SSHLauncher', 'host': sshhost, 'port': 22, 'credentialsId': credentials_id, 'jnlp': { 'workDirSettings': { 'innerDir': 'remoting', 'failIfWorkDirIsCaseInsensitive': False, 'failIfWorkDirExists': False, 'createWorkDir': True, 'disabled': False, 'deleteWorkDir': False, 'stickyReferenceFiles': '' } } } ) except Exception as e: err_one = "node[{}] already exists".format(node_name) if str(e) != err_one: return False, e return True, "" def get_build_info(self, job_name, last_build_number=0): """ 获取job 信息 :param job_name: jenkins任务名称 :param last_build_number: jenkins build number :return: dict """ return self.server.get_job_info(job_name, last_build_number) def get_job_build_number(self, job_name): """ 获取job number :param job_name: jenkins任务名称 :return: int """ try: build_number = self.get_build_info(job_name)["lastBuild"]["number"] except: # 取不出来值是因为还没有执行过任务,所以返回默认值0 build_number = 0 return build_number def build_job(self, job_name, params): """ 执行job :param params: :return: """ self.server.build_job(job_name, params) hope_build_number = -1 # 获取创建的任务number while True: last_build_number = self.get_job_build_number(job_name) if last_build_number != hope_build_number: hope_build_number = last_build_number + 1 else: break time.sleep(1) # 获取自动化执行结果 while True: result = self.get_build_info_status(job_name, hope_build_number) if result is not None: break time.sleep(1) return hope_build_number, result def get_build_console_ouput(self, job_name, last_build_number): """ 获取项目控制台日志 :param job_name: jenkins任务名称 :param last_build_number: jenkins build number :return: str """ return self.server.get_build_console_output(name=job_name, number=last_build_number) def get_build_info_status(self, job_name, last_build_number): """ 获取任务状态 :param job_name: jenkins任务名称 :param last_build_number: jenkins build number :return:str 状态有4种:SUCCESS| FAILURE| ABORTED| pending """ builds = self.get_build_info(job_name, last_build_number)["builds"] for b in builds: if b["number"] == last_build_number: result = b["result"] return result def get_build_ouput_url(self, job_name, job_number): """ 获取日志输出链接地址 :param job_name: jenkins任务名称 :param job_number: jenkins build number :return: str """ return "{jenkins_url}/job/{job_name}/{job_number}/console".format(jenkins_url=self.url, job_name=job_name, job_number=job_number) def get_build_report_url(self, job_name, job_number, report_name): """ 获取自动化报告结果链接地址 :param job_name: jenkins任务名称 :param job_number: jenkins build number :param report_name: jenkins测试报告html文件名称 :return: str """ return "{jenkins_url}/job/{job_name}/{job_number}/artifact/{report_name}".format(jenkins_url=self.url, job_name=job_name, job_number=job_number, report_name=report_name)

暂时写了些常用的。其实还有删除、修改等操作,可以看下底层代码中有哪些方法可以用。可玩性还是很高的。就不详细写了。

就这样,下课!!!

标签:

python利用jenkins模块操作jenkins由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“python利用jenkins模块操作jenkins