Best Python code snippet using lisa_python
__init__.py
Source:__init__.py  
...41        "A hacky way of finding the metadata schema path"42        parent = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))43        path = os.path.join(parent, "media_probe_schema.json")44        return path45    def get_tool_path(self, tool):46        "Get the executable path for the tool in question"47        if tool not in self.paths or not os.access(self.paths[tool], os.X_OK):48            # try to find the tool and update the path49            p = shutil.which(tool)50            if p is None:51                raise FileNotFoundError(f"Cannot find executable for {tool}")52            self.paths[tool] = p53        return self.paths[tool]54    def probe(self, file):55        "Run all of the probes on a file"56        data = {'container': {'name': os.path.basename(file),57                            'size': os.path.getsize(file),58                            'time': os.path.getmtime(file)59                        },60                'streams': {},61                'tags': {}62                }63        if os.path.isfile(file):64            data['container']['type'] = 'file'65        elif os.path.isdir(file):66            data['container']['type'] = 'dir'67        else:68            data['container']['type'] = 'other'69        mime = self.get_mime_type(file)70        data['container']['mime_type'] = mime71        72        # run the probes73        self.probe_time_based_media(file, data)74        self.probe_image_media(file, data)75        self.probe_text(file, data)76        self.probe_document(file, data)77        return data78    def get_mime_type(self, file):79        """80        Use the file command to get the mime type for a file81        """    82        result = subprocess.run([self.get_tool_path('file'), '--brief', '--mime-type', '--dereference', file], stdout=subprocess.PIPE, check=True)83        mime = str(result.stdout, 'utf-8').rstrip()84        # There are some weird corner cases we'd like to pick up...85        # * pick up yaml files, since they really just look like plain text.                86        if file.endswith(".yaml"):87            mime = "application/x-yaml"88        # * It appears that MPEG program streams look like targa image files,89        # we we can detect them by looking a little harder... (plus, nobody uses targa)90        if mime == 'image/x-tga':91            result = subprocess.run([self.get_tool_path('file'), '-k', '--dereference', file], stdout=subprocess.PIPE, check=True)92            extended = str(result.stdout, 'utf-8').rstrip()93            if 'MPEG' in extended:94                mime = 'video/mpeg'95        return mime96    def probe_time_based_media(self, file, data):97        """98        Use ffprobe to get information about a file and update the data99        structure with that information.100        """101        if (not re.match(r"(audio|video)/", data['container']['mime_type'])) and not file.endswith(".mkv"):102            return103        result = subprocess.run([self.get_tool_path('ffprobe'), '-v', '0', '-print_format', 'json', '-show_format', '-show_streams', file],104                                check=True, stdout=subprocess.PIPE)105        f = yaml.load(str(result.stdout, 'utf-8'), Loader=yaml.Loader)106        107        data['container']['duration'] = float(f['format']['duration'])108        data['container']['format'] = f['format']['format_name']109        if 'tags' in f['format']:110            data['container']['tags'] = {}111            for t in f['format']['tags']:112                data['container']['tags'][t] = f['format']['tags'][t]113        counter = 0114        data['streams'] = {}115        for stream in f['streams']:116            s = {}117            if stream['codec_type'] == 'audio':118                s['@type'] = 'audio'119                if 'codec_name' in  stream:120                    s['codec'] = stream['codec_name']121                elif 'codec_tag_string' in stream:122                    s['codec'] = stream['codec_tag_string']123                if 'duration' in stream:124                    s['duration'] = float(stream['duration'])125                elif 'tags' in stream and 'DURATION' in stream['tags']:126                    # matroska stores the duration in the tags in hh:mm:ss.sss format127                    s['duration'] = MediaProbe.hhmmss2secs(stream['tags']['DURATION'])128                else:129                    s['duration'] = data['container']['duration']130                s['sample_format'] = stream['sample_fmt']131                s['channels'] = stream['channels']132                if 'channel_layout' in stream:133                    s['channel_layout'] = stream['channel_layout']134                else:135                    # make some educated guesses136                    if s['channels'] == 1:137                        s['channel_layout'] = 'mono'138                    elif s['channels'] == 2:139                        s['channel_layout'] = 'stereo'140                s['sample_rate'] = int(stream['sample_rate'])141                if 'bits_per_sample' in stream:142                    s['bits_per_sample'] = int(stream['bits_per_sample'])143                elif 'bits_per_raw_sample' in stream:144                    s['bits_per_sample'] = int(stream['bits_per_raw_sample'])145                if s['bits_per_sample'] == 0:146                    s.pop('bits_per_sample')147                if 'bit_rate' in stream:148                    s['bit_rate'] = int(stream['bit_rate'])149                if 'tags' in stream:150                    s['user_data'] = {}151                    for t in stream['tags']:152                        s['user_data'][t] = stream['tags'][t]153            elif stream['codec_type'] == 'video':154                s['@type'] = 'video'155                if 'codec_name' in  stream:156                    s['codec'] = stream['codec_name']157                elif 'codec_tag_string' in stream:158                    s['codec'] = stream['codec_tag_string']159                # duration160                if 'duration' in stream:161                    s['duration'] = float(stream['duration'])162                elif 'tags' in stream and 'DURATION' in stream['tags']:163                    # matroska stores the duration in the tags in hh:mm:ss.sss format164                    s['duration'] = MediaProbe.hhmmss2secs(stream['tags']['DURATION'])165                else:166                    s['duration'] = data['container']['duration']167                168                # get video dimensions169                dims = {'width': stream['width'],170                        'height': stream['height']}171                if stream['width'] > 0 and stream['height'] > 0:172                    if 'sample_aspect_ratio' not in stream:173                        # without any information, we'll assume 1:1174                        stream['sample_aspect_ratio'] = '1:1'175                    dims['sample_aspect_ratio'] = MediaProbe.ratio2fraction(stream['sample_aspect_ratio'])176                    177                    if 'display_aspect_ratio' not in stream:178                        # without any information, the ratio is the sample aspect ratio and179                        # the ratio of width and height...using some math.180                        sar = fractions.Fraction(dims['sample_aspect_ratio'][0],181                                                dims['sample_aspect_ratio'][1])182                        par = fractions.Fraction(stream['width'], stream['height'])183                        stream['display_aspect_ratio'] = sar * par184                    dims['display_aspect_ratio'] = MediaProbe.ratio2fraction(stream['display_aspect_ratio'])185                s['dimensions'] = dims186                if 'pix_fmt' in stream:187                    s['pixel_format'] = stream['pix_fmt']188                if 'bit_rate' in stream:189                    s['bit_rate'] = int(stream['bit_rate'])190                if 'bits_per_sample' in stream:191                    s['bits_per_sample'] = int(stream['bits_per_sample'])192                elif 'bits_per_raw_sample' in stream:193                    s['bits_per_sample'] = int(stream['bits_per_raw_sample'])194                s['frame_rate'] = round(float(fractions.Fraction(stream['r_frame_rate'])), 2)195                if 'color_space' in stream:196                    s['color_space'] = stream['color_space']197                if 'color_transfer' in stream:198                    s['color_transfer'] = stream['color_transfer']199                if 'profile' in stream:200                    s['codec_profile'] = stream['profile']201                if 'tags' in stream:202                    s['user_data'] = {}203                    for t in stream['tags']:204                        s['user_data'][t] = stream['tags'][t]205            else:206                # for a data stream, we're just going to dump the tags...we don't207                # it could be a timecode or subtitle stream, or something else.208                s['@type'] = 'data'209                if 'tags' in stream:210                    s['user_data'] = {}211                    for t in stream['tags']:212                        s['user_data'][t] = stream['tags'][t]213            214            215            if s['@type'] not in data['streams']:216                data['streams'][s['@type']] = []217            s['@position'] = counter218            data['streams'][s['@type']].append(s)219            counter += 1 220    @staticmethod221    def hhmmss2secs(duration):222        "Convert a hh:mm:ss.sss string to seconds."223        parts = duration.split(":")224        return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])225    @staticmethod226    def ratio2fraction(ratio):227        if type(ratio) is str:228            if ':' in ratio:229                ratio = ratio.replace(':', "/")230        f = fractions.Fraction(ratio)231        return [f.numerator, f.denominator]232    def probe_image_media(self, file, data):233        """234        Use ImageMagick's identify to gather information about a file235        """236        if not re.match(r"image/", data['container']['mime_type']):237            return238        format_string = '''{239    "@type": "image",240    "dimensions": {241        "width": %[width],242        "height": %[height],243        "resolution": {244        "horizontal": %[resolution.x],245        "vertical": %[resolution.y],246        "unit": "%[units]"247        }248    },249    "pixel_type": "%[channels]",250    "codec": "%[magick]",251    "bit_depth": %[bit-depth],252    "compression": "%[compression]",253    "color_profile": "%[profile:icc]"254    },'''255        result = subprocess.run([self.get_tool_path('identify'), '-format', format_string, file], 256                                stdout=subprocess.PIPE, check=True)257        data['streams'] = {'image': []}258        counter = 0259        for image in yaml.load('[' + str(result.stdout, 'utf-8').rstrip().rstrip(',') + "]", Loader=yaml.Loader):260            i = image261            r = i['dimensions']['resolution']262            if r['unit'].startswith("PixelsPer"):263                r['unit'] = r['unit'][9:].lower()264            else:265                r.pop('unit')266            if r['horizontal'] == 0 or r['vertical'] == 0:267                i['dimensions'].pop('resolution')268            if i['color_profile'] == "":269                i.pop('color_profile')270            271            for k in ('codec', 'compression', 'pixel_type'):272                i[k] = i[k].lower()273            i['@position'] = counter274            data['streams']['image'].append(i)275            counter += 1276 277    def probe_text(self, file, data):278        """ 279        Probe a text file for more information.280        """281        282        if (not re.match(r"text/", data['container']['mime_type'])283            and not re.match(r"application/xml", data['container']['mime_type'])):284            return285        s = {'@type': 'text'}286        result = subprocess.run([self.get_tool_path('file'), '--brief', '--mime-encoding', '--dereference', file], stdout=subprocess.PIPE, check=True)287        s['encoding'] = str(result.stdout, 'utf-8').rstrip()288        result = subprocess.run([self.get_tool_path('file'), '--brief', '--dereference', file], stdout=subprocess.PIPE, check=True)289        s['description'] = str(result.stdout, 'utf-8').rstrip()290        291        # If this is XML, then let's do a little more probing...basically get the root292        # node, and perhaps a better encoding.  Don't actually parse the XML, just use293        # some regexes294        if data['container']['mime_type'].endswith("/xml"):295            with open(file, "r") as f:296                content = f.read()297                m = re.search(r"<\?xml[^>]+encoding=[\"\'](.+?)[\"\']", content)298                if m:299                    s['encoding'] = m.group(1)300                m = re.search(r"<([a-zA-Z_].+?)[\s>]", content)301                if m:302                    s['description'] = f"XML Document with '{m.group(1)}' root node"303        304        data['streams'] = {'text': []}305        s['@position'] = 0306        data['streams']['text'].append(s)307    def probe_document(self, file, data):308        """309        Handle office-type documents:  pdf, doc, docx, xls, xlsx, ppt, pptx310        """311        s = {'@type': 'document'}312        mime_type = data['container']['mime_type']313        if mime_type == "application/pdf":314            result = subprocess.run([self.get_tool_path('pdfinfo'), file], stdout=subprocess.PIPE, check=True)315            for l in str(result.stdout, 'utf-8').rstrip().split("\n"):316                #print(l)317                k, v = [x.strip() for x in l.split(":", 1)]318                #print(f"K={k}, V={v}")319                if k == "PDF version":320                    s['version'] = v321                elif k == "Pages":322                    s['pages'] = int(v)323                elif k == "Page size":324                    s['page_size'] = v325                elif k in ('Creator', 'Producer', 'Tagged'):326                    if v != "":327                        if 'user_data' not in s:328                            s['user_data'] = {}...shell_interface.py
Source:shell_interface.py  
...23    :param output_path: the path to send results.24    :type output_path: str25    :return: the response to the process being run26    """27    designite_config = config_interface.get_tool_path("designite")2829    cmd = [30        'java', f'-Xmx{designite_config["max_allocation"]}', '-jar', designite_config["path"],31        '-i', dir_to_analyze,32        '-o', output_path33    ]34    return _run_process(cmd)353637def run_ref_miner(repo_to_analyze, branch, output_file):38    """39    runs refactoring miner on a repo4041    :param repo_to_analyze: the repository to analyze42    :type repo_to_analyze: str43    :param branch: the branch to analyze44    :type branch: str45    :param output_file: the location of the output46    :type output_file: str47    :return: the response to the process being run48    """49    ref_miner_config = config_interface.get_tool_path("refactoring_miner")5051    cmd = [ref_miner_config["path"], '-a', repo_to_analyze, branch, '-json', output_file]52    return _run_process(cmd)535455def run_weka_filter(filter_method, filter_options, input_file, output_file):56    """57    Runs the weka filter on a file5859    :param filter_method: The filtering method60    :type filter_method: str61    :param filter_options: special options for the filter62    :type filter_options: str63    :param input_file: the file to run the filter on64    :type input_file: str65    :param output_file: the file containing the filtered results66    :type output_file: str67    :return: the response to the process being run68    """69    weka_config = config_interface.get_tool_path("weka")7071    cmd = [72        'java', '-classpath', f'"{weka_config["path"]}"', filter_method,73        '-i', f'"{input_file}"',74        '-o', f'"{output_file}"', filter_options75    ]76    return _run_process(cmd)777879def run_weka_classifier(classifier_method, classifier_options, input_path, model_output_path=None):80    """81    Runs the weka classifier on a file8283    :param classifier_method: The classifying method84    :type classifier_method: str85    :param classifier_options: special options for the classifier86    :type classifier_options: str87    :param input_path: the path of the file to be classified88    :type input_path: str89    :param model_output_path: the path for the output of the classification90    :type model_output_path: str91    :return: the response to the process being run92    """93    weka_config = config_interface.get_tool_path("weka")9495    model_arg = ''96    cmd = ['java', '-classpath', f'"{weka_config["path"]}"', classifier_method, '-t', f'"{input_path}"', '-o', '-v',97           classifier_options, model_arg]9899    if model_output_path is not None:100        cmd.extend(['-d', model_output_path])101        # model_arg = f'-d {model_output_path}'102103    return _run_process(cmd, check_out=True)104105106def load_and_run_weka_classifier(classifier_method, model_input_file, input_path):107    """108    Runs premade classifier on input file109110    :param classifier_method: the classifier to run111    :type classifier_method: str112    :param model_input_file: the model path113    :type model_input_file: str114    :param input_path: the file path115    :type input_path: str116    :return: the response to the process being run117    """118    weka_config = config_interface.get_tool_path("weka")119120    cmd = ['java', '-classpath', f'"{weka_config["path"]}"', classifier_method, '-l',121           f'"{model_input_file}"', '-T', f'"{input_path}"', '-p', '0']122    return _run_process(cmd, check_out=True)123124125def run_git_log_tags(repo_path):126    cmd = ['git', '-C', repo_path, 'log', '--no-walk', '--tags', '--reverse', '--date=iso-local', '--format=%H,%ad,%D']127    return _run_process(cmd, check_out=True)128129130def checkout_commit(repo_path, commit_hash):131    cmd = ['git', '-C', repo_path, 'checkout', '-f', commit_hash]132    return _run_process(cmd)
...tools.py
Source:tools.py  
1from os.path import join, dirname, abspath2from linguard.common.utils.system import Command, CommandResult3def get_tools_folder():4    return join(dirname(dirname(abspath(__file__))), "tools")5def get_tool_path(name: str):6    return join(get_tools_folder(), name)7def run_tool(name: str, as_root: bool = False) -> CommandResult:8    return Command(get_tool_path(name)).run(as_root)9def run_tool_as_root(name: str) -> CommandResult:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
