Best Python code snippet using autotest_python
job_creator.py
Source:job_creator.py  
1#!/usr/bin/env python22# -*- coding: utf-8 -*-3from argparse import ArgumentParser4from os import path5from re import findall, sub6from jenkins_ci import JenkinsCI7class JobCreator:8    def __init__(self, jenkins_url, username, token, project_name, view_name, git_repository, hipchat_room_id,9                 hipchat_token):10        self.jenkins_ci = JenkinsCI(jenkins_url, username, token)11        self.project_name = project_name12        self.git_repository = git_repository13        self.view_name = view_name14        self.hipchat_room = hipchat_room_id15        self.hipchat_token = hipchat_token16        self.main_job = project_name17        self.build_job = project_name + "-build"18        self.publish_job = project_name + "_PublishArtifacts"19        self.jobs_list = [self.main_job, self.build_job, self.publish_job]20        self.necessary_plugins = ["jenkins-multijob-plugin", "git", "gitlab-plugin", "envinject",21                                  "parameterized-trigger", "ws-cleanup", "build-name-setter", "disk-usage",22                                  "copyartifact", "scp", "email-ext", "matrix-project", "xshell", "hipchat"]23    def __correct_plugins_version(self, xml_config):24        plugins_version_dict = self.jenkins_ci.get_plugins_version_dict(self.necessary_plugins)25        plugins_in_xml = set([sub("plugin=|\"", "", p) for p in findall("plugin=.*\"", xml_config)])26        for plugin_with_version in plugins_in_xml:27            plugin_name = plugin_with_version.split("@")[0]28            new_version = plugins_version_dict[plugin_name]29            xml_config = xml_config.replace(plugin_with_version, "{0}@{1}".format(plugin_name, new_version))30        return xml_config31    def __correct_variable(self, xml_config):32        xml_config = xml_config.replace("${JOB_CREATOR_GIT_URL}", self.git_repository)33        xml_config = xml_config.replace("${JOB_CREATOR_PROJECT_NAME}", self.main_job)34        xml_config = xml_config.replace("${JOB_CREATOR_BUILD_NAME}", self.build_job)35        xml_config = xml_config.replace("${JOB_CREATOR_ARTIFACTS_NAME}", self.publish_job)36        xml_config = xml_config.replace("${JOB_CREATOR_HIP_CHAT_TOKEN}", self.hipchat_token)37        xml_config = xml_config.replace("${JOB_CREATOR_HIP_CHAT_ROOM}", self.hipchat_room)38        return xml_config39    def __create_job_with_reconf(self, job_name, xml_config_name):40        execute_path = path.dirname(path.realpath(__file__))41        with open(path.join(execute_path, "template", xml_config_name), "r") as config:42            xml_config = config.read()43        xml_config = self.__correct_variable(xml_config)44        xml_config = self.__correct_plugins_version(xml_config)45        self.jenkins_ci.create_job(job_name, xml_config)46    def create_projects(self):47        if not self.jenkins_ci.plugins_checker(self.necessary_plugins):48            exit(1)49        if self.jenkins_ci.check_jobs_exist(self.jobs_list):50            exit(1)51        self.__create_job_with_reconf(self.main_job, "main_job.xml")52        self.__create_job_with_reconf(self.build_job, "build_job.xml")53        self.__create_job_with_reconf(self.publish_job, "publish_job.xml")54        self.jenkins_ci.create_view(self.view_name)55        for job in self.jobs_list:56            self.jenkins_ci.move_job_on_view(self.view_name, job)57if __name__ == "__main__":58    parser = ArgumentParser(description="Script for create multi project job. Script uses config.py")59    args = parser.parse_args()60    from config import *61    print "[*] : Jenkins - creating jobs"62    creator = JobCreator(JENKINS_SERVER, USERNAME, JENKINS_TOKEN, PROJECT_NAME, JENKINS_VIEW_NAME, GIT_REPOSITORY_URL,63                         HIPCHAT_JENKINS_TOKEN, HIPCHAT_JENKINS_TOKEN)64    creator.create_projects()65    job_url = "{0}/job/{1}".format(JENKINS_SERVER, PROJECT_NAME)66    print "Project '{0}' was created".format(PROJECT_NAME)...client.py
Source:client.py  
...28from kombu.common import maybe_declare29from kombu.pools import producers30from queues import announce_exchange31logger = logging.getLogger("client")32def publish_job(job, routing_key="announce"):33    with BrokerConnection("amqp://guest:guest@localhost:5672//") as conn:34        with producers[conn].acquire(block=True) as producer:35            maybe_declare(announce_exchange, producer.channel)36            if job.get("command", None) == "flood":37                job["command"] = "print"38                for x in range(100):39                    job["sequence"] = x40                    producer.publish(job, serializer="json", routing_key=routing_key)41                    logger.info("PUBLISH: %s route %s" % (job, routing_key))42            else:43                producer.publish(job, serializer="json", routing_key=routing_key)44                logger.info("PUBLISH: %s route %s" % (job, routing_key))45if __name__ == '__main__':46    logging.basicConfig(level=logging.DEBUG)47    debug.setup_logging(logging.DEBUG)48    if len(sys.argv) == 1:49        payload = {"message": "Hello World", "command": "new-slave", "job-id": str(time.time())}50        publish_job(payload)51        sys.exit(0)52    command = "print"53    if len(sys.argv) > 1:54        command = sys.argv[1]55    routing_key = "announce"56    if len(sys.argv) > 2:57        routing_key = sys.argv[2]58    arg = None59    if len(sys.argv) > 3:60        arg = sys.argv[3]61    job = dict(message="Hello World", command=command)62    if arg:63        job["argument"] = arg64    publish_job(job, routing_key)...urls.py
Source:urls.py  
1from django.urls import include, path2from .views import SignUpView, ActivateAccount, Home, login_employer, edit_job, delete_job, job_detail, publish_job, \3    view_applied_candidate, disqualify, shortlist, job_post, shortlistview_applied_candidate, \4    disqualifyview_applied_candidate5from . import views6from django.contrib.auth import views as auth_views  # import this7app_name = 'recruiter'8urlpatterns = [9    path('', Home, name='employer_home'),10    path('addjob/', job_post, name='job_post'),11    path('editjob/<int:pk>', edit_job, name='edit_job'),12    path('deletejob/<int:pk>', delete_job, name='delete_job'),13    path('jobdetail/<int:pk>', job_detail, name='job_detail'),14    path('jobdetail/publishjob/<int:pk>', publish_job, name='publish_job'),15    path('jobdetail/applied_candidate/<int:pk>', view_applied_candidate, name='view_applied_candidate'),16    path('jobdetail/applied_candidate/shortlistview/<int:pk>', shortlistview_applied_candidate,17         name='shortlist_view_applied_candidate'),18    path('jobdetail/applied_candidate/disqualifyview/<int:pk>', disqualifyview_applied_candidate,19         name='disqualify_view_applied_candidate'),20    path('jobdetail/applied_candidate/shortlist/<int:pk>', shortlist, name='shortlist'),21    path('jobdetail/applied_candidate/disqualify/<int:pk>', disqualify, name='disqualify'),22    path('login', login_employer, name='employer/login'),23    path('logout/', auth_views.LogoutView.as_view(next_page='/recruiter/login'), name='logout'),24    path('signup', SignUpView.as_view(), name='employer/register'),25    path('activate/<uidb64>/<token>/', ActivateAccount.as_view(), name='activate'),...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
