Best Python code snippet using molotov_python
cliente.py
Source:cliente.py  
...168            soc.close()169            self.connection.close()170    def validarPassword(self, password):171        return adminUsers.usuario_valido('admin', password)172    def do_GET(self):173        modoDeConexion = self.headers.getheader('Proxy-Connection',174                                                'Transparente')175        hostDestino = self.headers.getheader('Host')176        hostDestino = "http://%s" % hostDestino177        if modoDeConexion == 'Transparente' and hostDestino not in self.path:178            url = "http://" + hostDestino + self.path179            modo = "TRANSPARENTE"180        else:181            url = self.path182            modo = "PROXY"183        self.server.logger.log(184                logging.DEBUG,185                "Modo de conexion: %(modo)s , URL: %(url)s"186                % {'modo': modo ,'url': url})...test_wf_api.py
Source:test_wf_api.py  
1########2# Copyright (c) 2013 GigaSpaces Technologies Ltd. All rights reserved3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8#        http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13#    * See the License for the specific language governing permissions and14#    * limitations under the License.15import uuid16from cloudify_rest_client.executions import Execution17from testenv import TestCase18from testenv.utils import get_resource as resource19from testenv.utils import deploy_and_execute_workflow as deploy20from testenv.utils import delete_provider_context21from testenv.utils import restore_provider_context22class WorkflowsAPITest(TestCase):23    def setUp(self):24        super(WorkflowsAPITest, self).setUp()25        self.do_get = True26        self.configure(retries=2, interval=1)27        self.addCleanup(restore_provider_context)28    def configure(self, retries, interval):29        delete_provider_context()30        context = {'cloudify': {'workflows': {31            'task_retries': retries,32            'task_retry_interval': interval33        }}}34        self.client.manager.create_context(self._testMethodName, context)35    def test_simple(self):36        parameters = {37            'do_get': self.do_get,38            'key': 'key1',39            'value': 'value1'40        }41        result_dict = {42            'key1': 'value1'43        }44        deployment, _ = deploy(resource('dsl/workflow_api.yaml'),45                               self._testMethodName,46                               parameters=parameters)47        # testing workflow remote task48        invocation = self.get_plugin_data(49            plugin_name='testmockoperations',50            deployment_id=deployment.id51        )['mock_operation_invocation'][0]52        self.assertDictEqual(result_dict, invocation)53        # testing workflow local task54        instance = self.client.node_instances.list(55            deployment_id=deployment.id)[0]56        self.assertEqual('test_state', instance.state)57    def test_fail_remote_task_eventual_success(self):58        deployment, _ = deploy(resource('dsl/workflow_api.yaml'),59                               self._testMethodName,60                               parameters={'do_get': self.do_get})61        # testing workflow remote task62        invocations = self.get_plugin_data(63            plugin_name='testmockoperations',64            deployment_id=deployment.id65        )['failure_invocation']66        self.assertEqual(3, len(invocations))67        for i in range(len(invocations) - 1):68            self.assertLessEqual(1, invocations[i+1] - invocations[i])69    def test_fail_remote_task_eventual_failure(self):70        deployment_id = str(uuid.uuid4())71        self.assertRaises(RuntimeError, deploy,72                          resource('dsl/workflow_api.yaml'),73                          self._testMethodName,74                          deployment_id=deployment_id,75                          parameters={'do_get': self.do_get})76        # testing workflow remote task77        invocations = self.get_plugin_data(78            plugin_name='testmockoperations',79            deployment_id=deployment_id80        )['failure_invocation']81        self.assertEqual(3, len(invocations))82        for i in range(len(invocations) - 1):83            self.assertLessEqual(1, invocations[i+1] - invocations[i])84    def test_fail_local_task_eventual_success(self):85        deploy(resource('dsl/workflow_api.yaml'), self._testMethodName,86               parameters={'do_get': self.do_get})87    def test_fail_local_task_eventual_failure(self):88        self._local_task_fail_impl(self._testMethodName)89    def test_fail_local_task_on_nonrecoverable_error(self):90        if not self.do_get:91            # setting infinite retries to make sure that the runtime error92            # raised is not because we ran out of retries93            # (no need to do this when self.do_get because the workflow will94            #  ensure that only one try was attempted)95            self.configure(retries=-1, interval=1)96        self._local_task_fail_impl(self._testMethodName)97    def _local_task_fail_impl(self, wf_name):98        if self.do_get:99            deploy(resource('dsl/workflow_api.yaml'), wf_name,100                   parameters={'do_get': self.do_get})101        else:102            self.assertRaises(RuntimeError,103                              deploy,104                              resource('dsl/workflow_api.yaml'),105                              wf_name,106                              parameters={'do_get': self.do_get})107    def test_cancel_on_wait_for_task_termination(self):108        _, eid = deploy(109            resource('dsl/workflow_api.yaml'), self._testMethodName,110            parameters={'do_get': self.do_get}, wait_for_execution=False)111        self.wait_for_execution_status(eid, status=Execution.STARTED)112        self.client.executions.cancel(eid)113        self.wait_for_execution_status(eid, status=Execution.CANCELLED)114    def test_cancel_on_task_retry_interval(self):115        self.configure(retries=2, interval=1000000)116        _, eid = deploy(117            resource('dsl/workflow_api.yaml'), self._testMethodName,118            parameters={'do_get': self.do_get}, wait_for_execution=False)119        self.wait_for_execution_status(eid, status=Execution.STARTED)120        self.client.executions.cancel(eid)121        self.wait_for_execution_status(eid, status=Execution.CANCELLED)122    def test_illegal_non_graph_to_graph_mode(self):123        if not self.do_get:124            # no need to run twice125            return126        self.assertRaises(RuntimeError, deploy,127                          resource('dsl/workflow_api.yaml'),128                          self._testMethodName)129    def wait_for_execution_status(self, execution_id, status, timeout=30):130        def assertion():131            self.assertEqual(status,132                             self.client.executions.get(execution_id).status)133        self.do_assertions(assertion, timeout=timeout)134class WorkflowsAPITestNoGet(WorkflowsAPITest):135    def setUp(self):136        super(WorkflowsAPITestNoGet, self).setUp()...import_lol_jsons.py
Source:import_lol_jsons.py  
1import json2import os3from dataclasses import dataclass, field4from time import sleep5from typing import Optional, Dict, Any6import requests7from decouple import config8lol_ddragon = "https://ddragon.leagueoflegends.com"9lol_static = "https://static.developer.riotgames.com/docs/lol"10lol_api_euw = "https://euw1.api.riotgames.com"11lol_api_europe = "https://europe.api.riotgames.com"12@dataclass13class ImportLolJsons:14    json_path: str15    len_files: int = field(default=0)16    session: Optional[requests.session] = field(default=None)17    RIOT_API_KEY: Optional[str] = field(default=None)18    datas: Dict[str, Any] = field(default_factory=dict)19    urls = {20        "versions": lol_ddragon + "/api/versions.json",21        "maps": lol_static + "/maps.json",22        "seasons": lol_static + "/seasons.json",23        "queues": lol_static + "/queues.json",24        "gameModes": lol_static + "/gameModes.json",25        "gameTypes": lol_static + "/gameTypes.json",26        "regions": lol_ddragon + "/realms/euw.json",27        "languages": lol_ddragon + "/cdn/languages.json",28        "champions": lol_ddragon + "/cdn/{version}/data/en_US/champion.json",29        "summonerSpells": lol_ddragon + "/cdn/{version}/data/en_US/summoner.json",30        "profileIcons": lol_ddragon + "/cdn/{version}/data/en_US/profileicon.json",31        "items": lol_ddragon + "/cdn/{version}/data/en_US/item.json",32        "summoner": lol_api_euw + "/lol/summoner/v4/summoners/by-name/{name}",33        "matches": lol_api_europe + "/lol/match/v5/matches/by-puuid/{puuid}/ids",34        "match": lol_api_europe + "/lol/match/v5/matches/{matchId}",35        "matchTimeline": lol_api_europe + "/lol/match/v5/matches/{matchId}/timeline",36        "featuredMatches": lol_api_euw + "/lol/spectator/v4/featured-games",37        "spectator": lol_api_euw38                     + "/lol/spectator/v4/active-games/by-summoner/{encryptedSummonerId}",39        "challenges": lol_api_euw + "/lol/challenges/v1/player-data/{puuid}",40        "championsMasteries": lol_api_euw41                              + "/lol/champion-mastery/v4/champion-masteries/by-summoner/{encryptedSummonerId}",42    }43    def __post_init__(self):44        self.len_files = len(self.urls)45        self.session = requests.session()46    def check_jsons_ok(self):47        len_jsons = len([file for file in os.listdir("test_jsons") if file != ".gitignore"])48        if len_jsons != self.len_files:49            print("Jsons files not found, importing...")50            return self.import_jsons()51        return True52    def do_get(self, url: str) -> Optional[Dict[str, Any]]:53        try:54            response = self.session.get(url, params={"api_key": self.RIOT_API_KEY})55            response.raise_for_status()56            return response.json()57        except requests.exceptions.RequestException as e:58            print(e)59            return None60    def import_jsons(self):61        self.RIOT_API_KEY = config("RIOT_API_KEY")62        if self.RIOT_API_KEY is None:63            return False64        else:65            try:66                self.datas["versions"] = self.do_get(self.urls["versions"])67                self.datas["maps"] = self.do_get(self.urls["maps"])68                self.datas["seasons"] = self.do_get(self.urls["seasons"])69                self.datas["queues"] = self.do_get(self.urls["queues"])70                self.datas["gameModes"] = self.do_get(self.urls["gameModes"])71                self.datas["gameTypes"] = self.do_get(self.urls["gameTypes"])72                self.datas["regions"] = self.do_get(self.urls["regions"])73                self.datas["languages"] = self.do_get(self.urls["languages"])74                self.datas["champions"] = self.do_get(75                    self.urls["champions"].format(version=self.datas["versions"][0])76                )77                self.datas["summonerSpells"] = self.do_get(78                    self.urls["summonerSpells"].format(79                        version=self.datas["versions"][0]80                    )81                )82                self.datas["profileIcons"] = self.do_get(83                    self.urls["profileIcons"].format(version=self.datas["versions"][0])84                )85                self.datas["items"] = self.do_get(86                    self.urls["items"].format(version=self.datas["versions"][0])87                )88                self.datas["featuredMatches"] = self.do_get(self.urls["featuredMatches"])89                summoner_name = self.datas["featuredMatches"]["gameList"][0]["participants"][0]["summonerName"]90                self.datas["summoner"] = self.do_get(91                    self.urls["summoner"].format(name=summoner_name)92                )93                self.datas["spectator"] = self.do_get(94                    self.urls["spectator"].format(95                        encryptedSummonerId=self.datas["summoner"]["id"]96                    )97                )98                self.datas["matches"] = self.do_get(99                    self.urls["matches"].format(puuid=self.datas["summoner"]["puuid"])100                )101                match_id = self.datas["matches"][0]102                sleep(.5)103                self.datas["match"] = self.do_get(104                    self.urls["match"].format(matchId=match_id)105                )106                self.datas["matchTimeline"] = self.do_get(107                    self.urls["matchTimeline"].format(matchId=match_id)108                )109                self.datas["challenges"] = self.do_get(110                    self.urls["challenges"].format(111                        puuid=self.datas["summoner"]["puuid"]112                    )113                )114                self.datas["championsMasteries"] = self.do_get(115                    self.urls["championsMasteries"].format(116                        encryptedSummonerId=self.datas["summoner"]["id"]117                    )118                )119                for k, v in self.datas.items():120                    with open(f"{self.json_path}/{k}.json", "w") as f:121                        f.write(json.dumps(v))122            except Exception as e:123                print(e)124                return False125            print("Jsons imported")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
