How to use get_tool_path method in lisa

Best Python code snippet using lisa_python

__init__.py

Source:__init__.py Github

copy

Full Screen

...41 "A hacky way of finding the metadata schema path"42 parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))43 path = os.path.join(parent, "media_probe_schema.json")44 return path45 def get_tool_path(self, tool):46 "Get the executable path for the tool in question"47 if tool not in self.paths or not os.access(self.paths[tool], os.X_OK):48 # try to find the tool and update the path49 p = shutil.which(tool)50 if p is None:51 raise FileNotFoundError(f"Cannot find executable for {tool}")52 self.paths[tool] = p53 return self.paths[tool]54 def probe(self, file):55 "Run all of the probes on a file"56 data = {'container': {'name': os.path.basename(file),57 'size': os.path.getsize(file),58 'time': os.path.getmtime(file)59 },60 'streams': {},61 'tags': {}62 }63 if os.path.isfile(file):64 data['container']['type'] = 'file'65 elif os.path.isdir(file):66 data['container']['type'] = 'dir'67 else:68 data['container']['type'] = 'other'69 mime = self.get_mime_type(file)70 data['container']['mime_type'] = mime71 72 # run the probes73 self.probe_time_based_media(file, data)74 self.probe_image_media(file, data)75 self.probe_text(file, data)76 self.probe_document(file, data)77 return data78 def get_mime_type(self, file):79 """80 Use the file command to get the mime type for a file81 """ 82 result = subprocess.run([self.get_tool_path('file'), '--brief', '--mime-type', '--dereference', file], stdout=subprocess.PIPE, check=True)83 mime = str(result.stdout, 'utf-8').rstrip()84 # There are some weird corner cases we'd like to pick up...85 # * pick up yaml files, since they really just look like plain text. 86 if file.endswith(".yaml"):87 mime = "application/x-yaml"88 # * It appears that MPEG program streams look like targa image files,89 # we we can detect them by looking a little harder... (plus, nobody uses targa)90 if mime == 'image/x-tga':91 result = subprocess.run([self.get_tool_path('file'), '-k', '--dereference', file], stdout=subprocess.PIPE, check=True)92 extended = str(result.stdout, 'utf-8').rstrip()93 if 'MPEG' in extended:94 mime = 'video/mpeg'95 return mime96 def probe_time_based_media(self, file, data):97 """98 Use ffprobe to get information about a file and update the data99 structure with that information.100 """101 if (not re.match(r"(audio|video)/", data['container']['mime_type'])) and not file.endswith(".mkv"):102 return103 result = subprocess.run([self.get_tool_path('ffprobe'), '-v', '0', '-print_format', 'json', '-show_format', '-show_streams', file],104 check=True, stdout=subprocess.PIPE)105 f = yaml.load(str(result.stdout, 'utf-8'), Loader=yaml.Loader)106 107 data['container']['duration'] = float(f['format']['duration'])108 data['container']['format'] = f['format']['format_name']109 if 'tags' in f['format']:110 data['container']['tags'] = {}111 for t in f['format']['tags']:112 data['container']['tags'][t] = f['format']['tags'][t]113 counter = 0114 data['streams'] = {}115 for stream in f['streams']:116 s = {}117 if stream['codec_type'] == 'audio':118 s['@type'] = 'audio'119 if 'codec_name' in stream:120 s['codec'] = stream['codec_name']121 elif 'codec_tag_string' in stream:122 s['codec'] = stream['codec_tag_string']123 if 'duration' in stream:124 s['duration'] = float(stream['duration'])125 elif 'tags' in stream and 'DURATION' in stream['tags']:126 # matroska stores the duration in the tags in hh:mm:ss.sss format127 s['duration'] = MediaProbe.hhmmss2secs(stream['tags']['DURATION'])128 else:129 s['duration'] = data['container']['duration']130 s['sample_format'] = stream['sample_fmt']131 s['channels'] = stream['channels']132 if 'channel_layout' in stream:133 s['channel_layout'] = stream['channel_layout']134 else:135 # make some educated guesses136 if s['channels'] == 1:137 s['channel_layout'] = 'mono'138 elif s['channels'] == 2:139 s['channel_layout'] = 'stereo'140 s['sample_rate'] = int(stream['sample_rate'])141 if 'bits_per_sample' in stream:142 s['bits_per_sample'] = int(stream['bits_per_sample'])143 elif 'bits_per_raw_sample' in stream:144 s['bits_per_sample'] = int(stream['bits_per_raw_sample'])145 if s['bits_per_sample'] == 0:146 s.pop('bits_per_sample')147 if 'bit_rate' in stream:148 s['bit_rate'] = int(stream['bit_rate'])149 if 'tags' in stream:150 s['user_data'] = {}151 for t in stream['tags']:152 s['user_data'][t] = stream['tags'][t]153 elif stream['codec_type'] == 'video':154 s['@type'] = 'video'155 if 'codec_name' in stream:156 s['codec'] = stream['codec_name']157 elif 'codec_tag_string' in stream:158 s['codec'] = stream['codec_tag_string']159 # duration160 if 'duration' in stream:161 s['duration'] = float(stream['duration'])162 elif 'tags' in stream and 'DURATION' in stream['tags']:163 # matroska stores the duration in the tags in hh:mm:ss.sss format164 s['duration'] = MediaProbe.hhmmss2secs(stream['tags']['DURATION'])165 else:166 s['duration'] = data['container']['duration']167 168 # get video dimensions169 dims = {'width': stream['width'],170 'height': stream['height']}171 if stream['width'] > 0 and stream['height'] > 0:172 if 'sample_aspect_ratio' not in stream:173 # without any information, we'll assume 1:1174 stream['sample_aspect_ratio'] = '1:1'175 dims['sample_aspect_ratio'] = MediaProbe.ratio2fraction(stream['sample_aspect_ratio'])176 177 if 'display_aspect_ratio' not in stream:178 # without any information, the ratio is the sample aspect ratio and179 # the ratio of width and height...using some math.180 sar = fractions.Fraction(dims['sample_aspect_ratio'][0],181 dims['sample_aspect_ratio'][1])182 par = fractions.Fraction(stream['width'], stream['height'])183 stream['display_aspect_ratio'] = sar * par184 dims['display_aspect_ratio'] = MediaProbe.ratio2fraction(stream['display_aspect_ratio'])185 s['dimensions'] = dims186 if 'pix_fmt' in stream:187 s['pixel_format'] = stream['pix_fmt']188 if 'bit_rate' in stream:189 s['bit_rate'] = int(stream['bit_rate'])190 if 'bits_per_sample' in stream:191 s['bits_per_sample'] = int(stream['bits_per_sample'])192 elif 'bits_per_raw_sample' in stream:193 s['bits_per_sample'] = int(stream['bits_per_raw_sample'])194 s['frame_rate'] = round(float(fractions.Fraction(stream['r_frame_rate'])), 2)195 if 'color_space' in stream:196 s['color_space'] = stream['color_space']197 if 'color_transfer' in stream:198 s['color_transfer'] = stream['color_transfer']199 if 'profile' in stream:200 s['codec_profile'] = stream['profile']201 if 'tags' in stream:202 s['user_data'] = {}203 for t in stream['tags']:204 s['user_data'][t] = stream['tags'][t]205 else:206 # for a data stream, we're just going to dump the tags...we don't207 # it could be a timecode or subtitle stream, or something else.208 s['@type'] = 'data'209 if 'tags' in stream:210 s['user_data'] = {}211 for t in stream['tags']:212 s['user_data'][t] = stream['tags'][t]213 214 215 if s['@type'] not in data['streams']:216 data['streams'][s['@type']] = []217 s['@position'] = counter218 data['streams'][s['@type']].append(s)219 counter += 1 220 @staticmethod221 def hhmmss2secs(duration):222 "Convert a hh:mm:ss.sss string to seconds."223 parts = duration.split(":")224 return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])225 @staticmethod226 def ratio2fraction(ratio):227 if type(ratio) is str:228 if ':' in ratio:229 ratio = ratio.replace(':', "/")230 f = fractions.Fraction(ratio)231 return [f.numerator, f.denominator]232 def probe_image_media(self, file, data):233 """234 Use ImageMagick's identify to gather information about a file235 """236 if not re.match(r"image/", data['container']['mime_type']):237 return238 format_string = '''{239 "@type": "image",240 "dimensions": {241 "width": %[width],242 "height": %[height],243 "resolution": {244 "horizontal": %[resolution.x],245 "vertical": %[resolution.y],246 "unit": "%[units]"247 }248 },249 "pixel_type": "%[channels]",250 "codec": "%[magick]",251 "bit_depth": %[bit-depth],252 "compression": "%[compression]",253 "color_profile": "%[profile:icc]"254 },'''255 result = subprocess.run([self.get_tool_path('identify'), '-format', format_string, file], 256 stdout=subprocess.PIPE, check=True)257 data['streams'] = {'image': []}258 counter = 0259 for image in yaml.load('[' + str(result.stdout, 'utf-8').rstrip().rstrip(',') + "]", Loader=yaml.Loader):260 i = image261 r = i['dimensions']['resolution']262 if r['unit'].startswith("PixelsPer"):263 r['unit'] = r['unit'][9:].lower()264 else:265 r.pop('unit')266 if r['horizontal'] == 0 or r['vertical'] == 0:267 i['dimensions'].pop('resolution')268 if i['color_profile'] == "":269 i.pop('color_profile')270 271 for k in ('codec', 'compression', 'pixel_type'):272 i[k] = i[k].lower()273 i['@position'] = counter274 data['streams']['image'].append(i)275 counter += 1276 277 def probe_text(self, file, data):278 """ 279 Probe a text file for more information.280 """281 282 if (not re.match(r"text/", data['container']['mime_type'])283 and not re.match(r"application/xml", data['container']['mime_type'])):284 return285 s = {'@type': 'text'}286 result = subprocess.run([self.get_tool_path('file'), '--brief', '--mime-encoding', '--dereference', file], stdout=subprocess.PIPE, check=True)287 s['encoding'] = str(result.stdout, 'utf-8').rstrip()288 result = subprocess.run([self.get_tool_path('file'), '--brief', '--dereference', file], stdout=subprocess.PIPE, check=True)289 s['description'] = str(result.stdout, 'utf-8').rstrip()290 291 # If this is XML, then let's do a little more probing...basically get the root292 # node, and perhaps a better encoding. Don't actually parse the XML, just use293 # some regexes294 if data['container']['mime_type'].endswith("/xml"):295 with open(file, "r") as f:296 content = f.read()297 m = re.search(r"<\?xml[^>]+encoding=[\"\'](.+?)[\"\']", content)298 if m:299 s['encoding'] = m.group(1)300 m = re.search(r"<([a-zA-Z_].+?)[\s>]", content)301 if m:302 s['description'] = f"XML Document with '{m.group(1)}' root node"303 304 data['streams'] = {'text': []}305 s['@position'] = 0306 data['streams']['text'].append(s)307 def probe_document(self, file, data):308 """309 Handle office-type documents: pdf, doc, docx, xls, xlsx, ppt, pptx310 """311 s = {'@type': 'document'}312 mime_type = data['container']['mime_type']313 if mime_type == "application/pdf":314 result = subprocess.run([self.get_tool_path('pdfinfo'), file], stdout=subprocess.PIPE, check=True)315 for l in str(result.stdout, 'utf-8').rstrip().split("\n"):316 #print(l)317 k, v = [x.strip() for x in l.split(":", 1)]318 #print(f"K={k}, V={v}")319 if k == "PDF version":320 s['version'] = v321 elif k == "Pages":322 s['pages'] = int(v)323 elif k == "Page size":324 s['page_size'] = v325 elif k in ('Creator', 'Producer', 'Tagged'):326 if v != "":327 if 'user_data' not in s:328 s['user_data'] = {}...

Full Screen

Full Screen

shell_interface.py

Source:shell_interface.py Github

copy

Full Screen

...23 :param output_path: the path to send results.24 :type output_path: str25 :return: the response to the process being run26 """27 designite_config = config_interface.get_tool_path("designite")2829 cmd = [30 'java', f'-Xmx{designite_config["max_allocation"]}', '-jar', designite_config["path"],31 '-i', dir_to_analyze,32 '-o', output_path33 ]34 return _run_process(cmd)353637def run_ref_miner(repo_to_analyze, branch, output_file):38 """39 runs refactoring miner on a repo4041 :param repo_to_analyze: the repository to analyze42 :type repo_to_analyze: str43 :param branch: the branch to analyze44 :type branch: str45 :param output_file: the location of the output46 :type output_file: str47 :return: the response to the process being run48 """49 ref_miner_config = config_interface.get_tool_path("refactoring_miner")5051 cmd = [ref_miner_config["path"], '-a', repo_to_analyze, branch, '-json', output_file]52 return _run_process(cmd)535455def run_weka_filter(filter_method, filter_options, input_file, output_file):56 """57 Runs the weka filter on a file5859 :param filter_method: The filtering method60 :type filter_method: str61 :param filter_options: special options for the filter62 :type filter_options: str63 :param input_file: the file to run the filter on64 :type input_file: str65 :param output_file: the file containing the filtered results66 :type output_file: str67 :return: the response to the process being run68 """69 weka_config = config_interface.get_tool_path("weka")7071 cmd = [72 'java', '-classpath', f'"{weka_config["path"]}"', filter_method,73 '-i', f'"{input_file}"',74 '-o', f'"{output_file}"', filter_options75 ]76 return _run_process(cmd)777879def run_weka_classifier(classifier_method, classifier_options, input_path, model_output_path=None):80 """81 Runs the weka classifier on a file8283 :param classifier_method: The classifying method84 :type classifier_method: str85 :param classifier_options: special options for the classifier86 :type classifier_options: str87 :param input_path: the path of the file to be classified88 :type input_path: str89 :param model_output_path: the path for the output of the classification90 :type model_output_path: str91 :return: the response to the process being run92 """93 weka_config = config_interface.get_tool_path("weka")9495 model_arg = ''96 cmd = ['java', '-classpath', f'"{weka_config["path"]}"', classifier_method, '-t', f'"{input_path}"', '-o', '-v',97 classifier_options, model_arg]9899 if model_output_path is not None:100 cmd.extend(['-d', model_output_path])101 # model_arg = f'-d {model_output_path}'102103 return _run_process(cmd, check_out=True)104105106def load_and_run_weka_classifier(classifier_method, model_input_file, input_path):107 """108 Runs premade classifier on input file109110 :param classifier_method: the classifier to run111 :type classifier_method: str112 :param model_input_file: the model path113 :type model_input_file: str114 :param input_path: the file path115 :type input_path: str116 :return: the response to the process being run117 """118 weka_config = config_interface.get_tool_path("weka")119120 cmd = ['java', '-classpath', f'"{weka_config["path"]}"', classifier_method, '-l',121 f'"{model_input_file}"', '-T', f'"{input_path}"', '-p', '0']122 return _run_process(cmd, check_out=True)123124125def run_git_log_tags(repo_path):126 cmd = ['git', '-C', repo_path, 'log', '--no-walk', '--tags', '--reverse', '--date=iso-local', '--format=%H,%ad,%D']127 return _run_process(cmd, check_out=True)128129130def checkout_commit(repo_path, commit_hash):131 cmd = ['git', '-C', repo_path, 'checkout', '-f', commit_hash]132 return _run_process(cmd) ...

Full Screen

Full Screen

tools.py

Source:tools.py Github

copy

Full Screen

1from os.path import join, dirname, abspath2from linguard.common.utils.system import Command, CommandResult3def get_tools_folder():4 return join(dirname(dirname(abspath(__file__))), "tools")5def get_tool_path(name: str):6 return join(get_tools_folder(), name)7def run_tool(name: str, as_root: bool = False) -> CommandResult:8 return Command(get_tool_path(name)).run(as_root)9def run_tool_as_root(name: str) -> CommandResult:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run lisa automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful