How to use do_GET method in Molotov

Best Python code snippet using molotov_python

cliente.py

Source:cliente.py Github

copy

Full Screen

...168 soc.close()169 self.connection.close()170 def validarPassword(self, password):171 return adminUsers.usuario_valido('admin', password)172 def do_GET(self):173 modoDeConexion = self.headers.getheader('Proxy-Connection',174 'Transparente')175 hostDestino = self.headers.getheader('Host')176 hostDestino = "http://%s" % hostDestino177 if modoDeConexion == 'Transparente' and hostDestino not in self.path:178 url = "http://" + hostDestino + self.path179 modo = "TRANSPARENTE"180 else:181 url = self.path182 modo = "PROXY"183 self.server.logger.log(184 logging.DEBUG,185 "Modo de conexion: %(modo)s , URL: %(url)s"186 % {'modo': modo ,'url': url})...

Full Screen

Full Screen

test_wf_api.py

Source:test_wf_api.py Github

copy

Full Screen

1########2# Copyright (c) 2013 GigaSpaces Technologies Ltd. All rights reserved3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# * See the License for the specific language governing permissions and14# * limitations under the License.15import uuid16from cloudify_rest_client.executions import Execution17from testenv import TestCase18from testenv.utils import get_resource as resource19from testenv.utils import deploy_and_execute_workflow as deploy20from testenv.utils import delete_provider_context21from testenv.utils import restore_provider_context22class WorkflowsAPITest(TestCase):23 def setUp(self):24 super(WorkflowsAPITest, self).setUp()25 self.do_get = True26 self.configure(retries=2, interval=1)27 self.addCleanup(restore_provider_context)28 def configure(self, retries, interval):29 delete_provider_context()30 context = {'cloudify': {'workflows': {31 'task_retries': retries,32 'task_retry_interval': interval33 }}}34 self.client.manager.create_context(self._testMethodName, context)35 def test_simple(self):36 parameters = {37 'do_get': self.do_get,38 'key': 'key1',39 'value': 'value1'40 }41 result_dict = {42 'key1': 'value1'43 }44 deployment, _ = deploy(resource('dsl/workflow_api.yaml'),45 self._testMethodName,46 parameters=parameters)47 # testing workflow remote task48 invocation = self.get_plugin_data(49 plugin_name='testmockoperations',50 deployment_id=deployment.id51 )['mock_operation_invocation'][0]52 self.assertDictEqual(result_dict, invocation)53 # testing workflow local task54 instance = self.client.node_instances.list(55 deployment_id=deployment.id)[0]56 self.assertEqual('test_state', instance.state)57 def test_fail_remote_task_eventual_success(self):58 deployment, _ = deploy(resource('dsl/workflow_api.yaml'),59 self._testMethodName,60 parameters={'do_get': self.do_get})61 # testing workflow remote task62 invocations = self.get_plugin_data(63 plugin_name='testmockoperations',64 deployment_id=deployment.id65 )['failure_invocation']66 self.assertEqual(3, len(invocations))67 for i in range(len(invocations) - 1):68 self.assertLessEqual(1, invocations[i+1] - invocations[i])69 def test_fail_remote_task_eventual_failure(self):70 deployment_id = str(uuid.uuid4())71 self.assertRaises(RuntimeError, deploy,72 resource('dsl/workflow_api.yaml'),73 self._testMethodName,74 deployment_id=deployment_id,75 parameters={'do_get': self.do_get})76 # testing workflow remote task77 invocations = self.get_plugin_data(78 plugin_name='testmockoperations',79 deployment_id=deployment_id80 )['failure_invocation']81 self.assertEqual(3, len(invocations))82 for i in range(len(invocations) - 1):83 self.assertLessEqual(1, invocations[i+1] - invocations[i])84 def test_fail_local_task_eventual_success(self):85 deploy(resource('dsl/workflow_api.yaml'), self._testMethodName,86 parameters={'do_get': self.do_get})87 def test_fail_local_task_eventual_failure(self):88 self._local_task_fail_impl(self._testMethodName)89 def test_fail_local_task_on_nonrecoverable_error(self):90 if not self.do_get:91 # setting infinite retries to make sure that the runtime error92 # raised is not because we ran out of retries93 # (no need to do this when self.do_get because the workflow will94 # ensure that only one try was attempted)95 self.configure(retries=-1, interval=1)96 self._local_task_fail_impl(self._testMethodName)97 def _local_task_fail_impl(self, wf_name):98 if self.do_get:99 deploy(resource('dsl/workflow_api.yaml'), wf_name,100 parameters={'do_get': self.do_get})101 else:102 self.assertRaises(RuntimeError,103 deploy,104 resource('dsl/workflow_api.yaml'),105 wf_name,106 parameters={'do_get': self.do_get})107 def test_cancel_on_wait_for_task_termination(self):108 _, eid = deploy(109 resource('dsl/workflow_api.yaml'), self._testMethodName,110 parameters={'do_get': self.do_get}, wait_for_execution=False)111 self.wait_for_execution_status(eid, status=Execution.STARTED)112 self.client.executions.cancel(eid)113 self.wait_for_execution_status(eid, status=Execution.CANCELLED)114 def test_cancel_on_task_retry_interval(self):115 self.configure(retries=2, interval=1000000)116 _, eid = deploy(117 resource('dsl/workflow_api.yaml'), self._testMethodName,118 parameters={'do_get': self.do_get}, wait_for_execution=False)119 self.wait_for_execution_status(eid, status=Execution.STARTED)120 self.client.executions.cancel(eid)121 self.wait_for_execution_status(eid, status=Execution.CANCELLED)122 def test_illegal_non_graph_to_graph_mode(self):123 if not self.do_get:124 # no need to run twice125 return126 self.assertRaises(RuntimeError, deploy,127 resource('dsl/workflow_api.yaml'),128 self._testMethodName)129 def wait_for_execution_status(self, execution_id, status, timeout=30):130 def assertion():131 self.assertEqual(status,132 self.client.executions.get(execution_id).status)133 self.do_assertions(assertion, timeout=timeout)134class WorkflowsAPITestNoGet(WorkflowsAPITest):135 def setUp(self):136 super(WorkflowsAPITestNoGet, self).setUp()...

Full Screen

Full Screen

import_lol_jsons.py

Source:import_lol_jsons.py Github

copy

Full Screen

1import json2import os3from dataclasses import dataclass, field4from time import sleep5from typing import Optional, Dict, Any6import requests7from decouple import config8lol_ddragon = "https://ddragon.leagueoflegends.com"9lol_static = "https://static.developer.riotgames.com/docs/lol"10lol_api_euw = "https://euw1.api.riotgames.com"11lol_api_europe = "https://europe.api.riotgames.com"12@dataclass13class ImportLolJsons:14 json_path: str15 len_files: int = field(default=0)16 session: Optional[requests.session] = field(default=None)17 RIOT_API_KEY: Optional[str] = field(default=None)18 datas: Dict[str, Any] = field(default_factory=dict)19 urls = {20 "versions": lol_ddragon + "/api/versions.json",21 "maps": lol_static + "/maps.json",22 "seasons": lol_static + "/seasons.json",23 "queues": lol_static + "/queues.json",24 "gameModes": lol_static + "/gameModes.json",25 "gameTypes": lol_static + "/gameTypes.json",26 "regions": lol_ddragon + "/realms/euw.json",27 "languages": lol_ddragon + "/cdn/languages.json",28 "champions": lol_ddragon + "/cdn/{version}/data/en_US/champion.json",29 "summonerSpells": lol_ddragon + "/cdn/{version}/data/en_US/summoner.json",30 "profileIcons": lol_ddragon + "/cdn/{version}/data/en_US/profileicon.json",31 "items": lol_ddragon + "/cdn/{version}/data/en_US/item.json",32 "summoner": lol_api_euw + "/lol/summoner/v4/summoners/by-name/{name}",33 "matches": lol_api_europe + "/lol/match/v5/matches/by-puuid/{puuid}/ids",34 "match": lol_api_europe + "/lol/match/v5/matches/{matchId}",35 "matchTimeline": lol_api_europe + "/lol/match/v5/matches/{matchId}/timeline",36 "featuredMatches": lol_api_euw + "/lol/spectator/v4/featured-games",37 "spectator": lol_api_euw38 + "/lol/spectator/v4/active-games/by-summoner/{encryptedSummonerId}",39 "challenges": lol_api_euw + "/lol/challenges/v1/player-data/{puuid}",40 "championsMasteries": lol_api_euw41 + "/lol/champion-mastery/v4/champion-masteries/by-summoner/{encryptedSummonerId}",42 }43 def __post_init__(self):44 self.len_files = len(self.urls)45 self.session = requests.session()46 def check_jsons_ok(self):47 len_jsons = len([file for file in os.listdir("test_jsons") if file != ".gitignore"])48 if len_jsons != self.len_files:49 print("Jsons files not found, importing...")50 return self.import_jsons()51 return True52 def do_get(self, url: str) -> Optional[Dict[str, Any]]:53 try:54 response = self.session.get(url, params={"api_key": self.RIOT_API_KEY})55 response.raise_for_status()56 return response.json()57 except requests.exceptions.RequestException as e:58 print(e)59 return None60 def import_jsons(self):61 self.RIOT_API_KEY = config("RIOT_API_KEY")62 if self.RIOT_API_KEY is None:63 return False64 else:65 try:66 self.datas["versions"] = self.do_get(self.urls["versions"])67 self.datas["maps"] = self.do_get(self.urls["maps"])68 self.datas["seasons"] = self.do_get(self.urls["seasons"])69 self.datas["queues"] = self.do_get(self.urls["queues"])70 self.datas["gameModes"] = self.do_get(self.urls["gameModes"])71 self.datas["gameTypes"] = self.do_get(self.urls["gameTypes"])72 self.datas["regions"] = self.do_get(self.urls["regions"])73 self.datas["languages"] = self.do_get(self.urls["languages"])74 self.datas["champions"] = self.do_get(75 self.urls["champions"].format(version=self.datas["versions"][0])76 )77 self.datas["summonerSpells"] = self.do_get(78 self.urls["summonerSpells"].format(79 version=self.datas["versions"][0]80 )81 )82 self.datas["profileIcons"] = self.do_get(83 self.urls["profileIcons"].format(version=self.datas["versions"][0])84 )85 self.datas["items"] = self.do_get(86 self.urls["items"].format(version=self.datas["versions"][0])87 )88 self.datas["featuredMatches"] = self.do_get(self.urls["featuredMatches"])89 summoner_name = self.datas["featuredMatches"]["gameList"][0]["participants"][0]["summonerName"]90 self.datas["summoner"] = self.do_get(91 self.urls["summoner"].format(name=summoner_name)92 )93 self.datas["spectator"] = self.do_get(94 self.urls["spectator"].format(95 encryptedSummonerId=self.datas["summoner"]["id"]96 )97 )98 self.datas["matches"] = self.do_get(99 self.urls["matches"].format(puuid=self.datas["summoner"]["puuid"])100 )101 match_id = self.datas["matches"][0]102 sleep(.5)103 self.datas["match"] = self.do_get(104 self.urls["match"].format(matchId=match_id)105 )106 self.datas["matchTimeline"] = self.do_get(107 self.urls["matchTimeline"].format(matchId=match_id)108 )109 self.datas["challenges"] = self.do_get(110 self.urls["challenges"].format(111 puuid=self.datas["summoner"]["puuid"]112 )113 )114 self.datas["championsMasteries"] = self.do_get(115 self.urls["championsMasteries"].format(116 encryptedSummonerId=self.datas["summoner"]["id"]117 )118 )119 for k, v in self.datas.items():120 with open(f"{self.json_path}/{k}.json", "w") as f:121 f.write(json.dumps(v))122 except Exception as e:123 print(e)124 return False125 print("Jsons imported")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Molotov automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful