How to use _resolve_queue method in localstack

Best Python code snippet using localstack_python

sonarlint.py

Source:sonarlint.py Github

copy

Full Screen

1import os2import subprocess3import threading4from sonarlintcli.languageserver import urify, unurify, LANGUAGES, get_language_id5JAR_DOWNLOAD_LANGUAGE_SERVER = "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/sonarlint/core/sonarlint-language-server/4.3.1.2486/sonarlint-language-server-4.3.1.2486.jar"6JAR_DOWNLOAD_LANGUAGES = {7 LANGUAGES.html: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/html/sonar-html-plugin/3.1.0.1615/sonar-html-plugin-3.1.0.1615.jar",8 LANGUAGES.javascript: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/javascript/sonar-javascript-plugin/5.1.1.7506/sonar-javascript-plugin-5.1.1.7506.jar",9 LANGUAGES.php: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/php/sonar-php-plugin/3.0.0.4537/sonar-php-plugin-3.0.0.4537.jar",10 LANGUAGES.python: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/python/sonar-python-plugin/1.12.0.2726/sonar-python-plugin-1.12.0.2726.jar",11 LANGUAGES.typescript: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/typescript/sonar-typescript-plugin/1.9.0.3766/sonar-typescript-plugin-1.9.0.3766.jar",12 LANGUAGES.kotlin: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/slang/sonar-kotlin-plugin/1.6.0.626/sonar-kotlin-plugin-1.6.0.626.jar",13 LANGUAGES.java: "https://repox.jfrog.io/repox/sonarsource/org/sonarsource/java/sonar-java-plugin/5.9.2.16552/sonar-java-plugin-5.9.2.16552.jar"14}15def ensure_callable(val):16 if callable(val):17 return val18 return lambda *args, **kwargs: None19class SonarLintRuleResolver:20 def __init__(self, language_server):21 self._language_server = language_server22 self._diagnostics_cache = {}23 self._resolve_queue = {}24 def get_by_diagnostics(self, file, diagnostics: dict, cb: callable):25 code = diagnostics['code']26 if code in self._diagnostics_cache:27 return self._diagnostics_cache[code]28 if code not in self._resolve_queue:29 self._resolve_queue[code] = [cb]30 else:31 self._resolve_queue[code].append(cb)32 return33 self._language_server.send_request("textDocument/codeAction", {34 'textDocument': {35 "uri": file36 },37 "range": diagnostics['range'],38 "context": {39 "diagnostics": diagnostics40 }41 }, self._on_rule_desc)42 def _on_rule_desc(self, responses):43 for response in responses:44 code, description, html, type, severity = response["arguments"]45 if code in self._resolve_queue:46 for cb in self._resolve_queue[code]:47 cb(code, description, html, type, severity)48 del self._resolve_queue[code]49class SonarLintProcess(threading.Thread):50 def __init__(self, port, ls_jar, analyzers, java_bin):51 super().__init__(target=self._run_sonarlint_ls)52 self.analyzers = analyzers53 self.java_bin = java_bin54 self.ls_jar = ls_jar55 self.port = port56 self._stop_e = threading.Event()57 self._is_stop = threading.Event()58 def get_sonar_analyzers(self):59 return ["file://" + analyzer for analyzer in self.analyzers]60 def _run_sonarlint_ls(self):61 cmd = [self.java_bin, "-jar", self.ls_jar, str(self.port)]62 cmd.extend(self.get_sonar_analyzers())63 with open(os.devnull, 'w') as devnull:64 with subprocess.Popen(cmd, stdout=devnull, stderr=devnull) as proc:65 self._stop_e.wait()66 proc.terminate()67 proc.wait()68 self._is_stop.set()69 def stop(self):70 self._stop_e.set()71 self._is_stop.wait()72class Analysis:73 def __init__(self, ls_client, rule_resolver: SonarLintRuleResolver, files: list, cb: callable, done: callable):74 self._files = files75 self._pending_files = []76 self._results = []77 self._ls_client = ls_client78 self._ls_client.on('textDocument/publishDiagnostics', self._on_diagnostics)79 self._rule_resolver = rule_resolver80 self._callback = ensure_callable(cb)81 self._done_callback = ensure_callable(done)82 def run(self):83 self._ls_client.send_request("initialize", {84 "processId": os.getpid(),85 "rootUri": os.path.commonpath(self._files),86 "capabilities": {},87 "initializationOptions": {88 "disableTelemetry": True,89 "includeRuleDetailsInCodeAction": True,90 "typeScriptLocation": "/usr/lib/node_modules/typescript/lib"91 }92 }, self._send_files)93 def _send_files(self, _init_result):94 for file in self._files:95 with open(str(file), "r") as fd:96 uri = urify(file)97 self._ls_client.send_notification("textDocument/didOpen", {98 "textDocument": {99 "uri": uri,100 "languageId": get_language_id(file),101 "version": 1,102 "text": fd.read()103 }104 })105 self._pending_files.append(uri)106 def _on_diagnostics(self, params: dict):107 file = params['uri']108 diagnostics = params['diagnostics']109 if file not in self._pending_files:110 return111 resolved = 0112 rules = {}113 # scoping function that will resolve all callbacks114 def resolve_callbacks():115 if len(diagnostics) == resolved:116 combined = {"uri": file, "diagnostics": diagnostics, "rules": rules}117 self._results.append(combined)118 self._callback(file, combined)119 self._pending_files.remove(file)120 if len(self._pending_files) == 0:121 # resolve completely if all files have been analyzed122 self._done_callback(self._results)123 # scoping function that calls the callback with the file, diagnostics and rule details124 def on_rule(code, description, html, type, severity):125 nonlocal resolved126 resolved += 1127 rules[code] = {code: code, description: description, html: html, type: type, severity: severity}128 resolve_callbacks()129 # in case there is no diagnostics for this file we can already resolve130 if len(diagnostics) == 0:131 resolve_callbacks()132 for diagnostic in diagnostics:133 self._rule_resolver.get_by_diagnostics(file, diagnostic, on_rule)134def analyze(ls_client, rule_resolver, files, done_callback = None, each_callback = None) -> Analysis:135 analysis = Analysis(ls_client, rule_resolver, files, each_callback, done_callback)136 analysis.run()...

Full Screen

Full Screen

mdnsMonitor.py

Source:mdnsMonitor.py Github

copy

Full Screen

1#!/usr/bin/python2# Copyright (C) 2019 Advanced Media Workflow Association3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15import time16import os17import queue18from zeroconf_monkey import ServiceBrowser, Zeroconf19from socket import inet_ntoa20from threading import Thread21MONITOR_TYPES = ["_nmos-registration._tcp", "_nmos-register._tcp", "_nmos-query._tcp", "_nmos-system._tcp"]22IP_WHITELIST = ["172.29.80.195", "172.29.80.196"]23class Listener:24 def __init__(self):25 self.services = {}26 self._resolve_queue = queue.Queue()27 def add_service(self, zeroconf, srv_type, name):28 self._resolve_queue.put((srv_type, name, zeroconf))29 resolveThread = Thread(target=self._resolve_service_details)30 resolveThread.daemon = True31 resolveThread.start()32 def _resolve_service_details(self):33 parameters = self._resolve_queue.get()34 srv_type = parameters[0]35 name = parameters[1]36 zeroconf = parameters[2]37 info = zeroconf.get_service_info(srv_type, name)38 self._update_record(srv_type, name, info)39 self._resolve_queue.task_done()40 def _update_record(self, srv_type, name, info):41 try:42 self.services[srv_type][name] = info43 except KeyError:44 self.services[srv_type] = {}45 self.services[srv_type][name] = info46 def remove_service(self, zeroconf, srv_type, name):47 try:48 self.services[srv_type].pop(name, None)49 except KeyError:50 pass51 def print_services(self):52 os.system("clear")53 for srv_type in self.services:54 print("Unexpected services of type '{}'".format(srv_type))55 for name in sorted(self.services[srv_type]):56 info = self.services[srv_type][name]57 if info:58 address = inet_ntoa(info.address)59 if address not in IP_WHITELIST:60 self._print_entry(name, info)61 else:62 self._print_entry(name)63 print("")64 def _print_entry(self, name, info=None):65 print(" - {}".format(name))66 if info is not None:67 address = inet_ntoa(info.address)68 print(" Address: {}, Port: {}, TXT: {}".format(address, info.port, info.properties))69 else:70 print(" Unresolvable")71zeroconf = Zeroconf()72listener = Listener()73for srv_type in MONITOR_TYPES:74 browser = ServiceBrowser(zeroconf, srv_type + ".local.", listener)75try:76 while True:77 listener.print_services()78 time.sleep(1)79except KeyboardInterrupt:80 pass81finally:82 print("* Shutting down, please wait...")...

Full Screen

Full Screen

mdnsListener.py

Source:mdnsListener.py Github

copy

Full Screen

1#!/usr/bin/env python2# Copyright 2017 British Broadcasting Corporation3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15from socket import inet_ntoa16from ..logger import Logger17from six.moves import queue18from threading import Thread19class MDNSListener(object):20 def __init__(self, callbackMethod, registerOnly):21 self.logger = Logger("zeroconf mDNS")22 self.callback = callbackMethod23 self.registerOnly = registerOnly24 self.records = {}25 self.nameMap = {}26 self._resolve_queue = queue.Queue()27 def add_service(self, zeroconf, srv_type, name):28 self._resolve_queue.put((srv_type, name, zeroconf))29 resolveThread = Thread(target=self._resolve_service_details)30 resolveThread.daemon = True31 resolveThread.start()32 def _resolve_service_details(self):33 threadParameters = self._resolve_queue.get()34 srv_type = threadParameters[0]35 name = threadParameters[1]36 zeroconf = threadParameters[2]37 info = zeroconf.get_service_info(srv_type, name)38 if info is not None:39 self._update_record(srv_type, name, info)40 self._respondToClient(name, "add", info)41 elif name in self.records[srv_type]:42 self.remove_service(zeroconf, srv_type, name)43 self._resolve_queue.task_done()44 def _update_record(self, srv_type, name, info):45 try:46 self.records[srv_type][name] = info47 except KeyError:48 self.records[srv_type] = {}49 self.records[srv_type][name] = info50 def remove_service(self, zeroconf, srv_type, name):51 info = self.records[srv_type].pop(name)52 if not self.registerOnly:53 action = "remove"54 self._respondToClient(name, action, info)55 def _respondToClient(self, name, action, info):56 callbackData = self._buildClientCallback(action, info)57 self.logger.writeDebug("mDNS Service {}: {}".format(action, callbackData))58 self.callback(callbackData)59 def _buildClientCallback(self, action, info):60 return {61 "action": action,62 "type": info.type,63 "name": info.name,64 "port": info.port,65 "hostname": info.server.rstrip("."),66 "address": inet_ntoa(info.address),67 "txt": info.properties68 }69 def garbageCollect(self, zeroconf, srv_type):70 for name in list(self.records[srv_type].keys()):71 self._resolve_queue.put((srv_type, name, zeroconf))72 resolveThread = Thread(target=self._resolve_service_details)73 resolveThread.daemon = True...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful