Best Python code snippet using localstack_python
fetch.py
Source:fetch.py  
1# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>2#3# This file is part of Ansible4#5# Ansible is free software: you can redistribute it and/or modify6# it under the terms of the GNU General Public License as published by7# the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# Ansible is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.17import os18import pwd19import random20import traceback21import tempfile22import base6423import ansible.constants as C24from ansible import utils25from ansible import errors26from ansible import module_common27from ansible.runner.return_data import ReturnData28class ActionModule(object):29    def __init__(self, runner):30        self.runner = runner31    def run(self, conn, tmp, module_name, module_args, inject, complex_args=None, **kwargs):32        ''' handler for fetch operations '''33        if self.runner.noop_on_check(inject):34            return ReturnData(conn=conn, comm_ok=True, result=dict(skipped=True, msg='check mode not (yet) supported for this module'))35        # load up options36        options = {}37        if complex_args:38            options.update(complex_args)39        options.update(utils.parse_kv(module_args))40        source = options.get('src', None)41        dest = options.get('dest', None)42        flat = options.get('flat', False)43        flat = utils.boolean(flat)44        fail_on_missing = options.get('fail_on_missing', False)45        fail_on_missing = utils.boolean(fail_on_missing)46        validate_checksum = options.get('validate_checksum', None)47        if validate_checksum is not None:48            validate_checksum = utils.boolean(validate_checksum)49        # Alias for validate_checksum (old way of specifying it)50        validate_md5 = options.get('validate_md5', None)51        if validate_md5 is not None:52            validate_md5 = utils.boolean(validate_md5)53        if validate_md5 is None and validate_checksum is None:54            # Default55            validate_checksum = True56        elif validate_checksum is None:57            validate_checksum = validate_md558        elif validate_md5 is not None and validate_checksum is not None:59            results = dict(failed=True, msg="validate_checksum and validate_md5 cannot both be specified")60            return ReturnData(conn, result=results)61        if source is None or dest is None:62            results = dict(failed=True, msg="src and dest are required")63            return ReturnData(conn=conn, result=results)64        source = conn.shell.join_path(source)65        source = self.runner._remote_expand_user(conn, source, tmp)66        # calculate checksum for the remote file67        remote_checksum = self.runner._remote_checksum(conn, tmp, source, inject)68        # use slurp if sudo and permissions are lacking69        remote_data = None70        if remote_checksum in ('1', '2') or self.runner.become:71            slurpres = self.runner._execute_module(conn, tmp, 'slurp', 'src=%s' % source, inject=inject)72            if slurpres.is_successful():73                if slurpres.result['encoding'] == 'base64':74                    remote_data = base64.b64decode(slurpres.result['content'])75                if remote_data is not None:76                    remote_checksum = utils.checksum_s(remote_data)77                # the source path may have been expanded on the78                # target system, so we compare it here and use the79                # expanded version if it's different80                remote_source = slurpres.result.get('source')81                if remote_source and remote_source != source:82                    source = remote_source83        # calculate the destination name84        if os.path.sep not in conn.shell.join_path('a', ''):85            source_local = source.replace('\\', '/')86        else:87            source_local = source88        dest = os.path.expanduser(dest)89        if flat:90            if dest.endswith("/"):91                # if the path ends with "/", we'll use the source filename as the92                # destination filename93                base = os.path.basename(source_local)94                dest = os.path.join(dest, base)95            if not dest.startswith("/"):96                # if dest does not start with "/", we'll assume a relative path97                dest = utils.path_dwim(self.runner.basedir, dest)98        else:99            # files are saved in dest dir, with a subdir for each host, then the filename100            dest = "%s/%s/%s" % (utils.path_dwim(self.runner.basedir, dest), inject['inventory_hostname'], source_local)101        dest = dest.replace("//","/")102        if remote_checksum in ('0', '1', '2', '3', '4'):103            # these don't fail because you may want to transfer a log file that possibly MAY exist104            # but keep going to fetch other log files105            if remote_checksum == '0':106                result = dict(msg="unable to calculate the checksum of the remote file", file=source, changed=False)107            elif remote_checksum == '1':108                if fail_on_missing:109                    result = dict(failed=True, msg="the remote file does not exist", file=source)110                else:111                    result = dict(msg="the remote file does not exist, not transferring, ignored", file=source, changed=False)112            elif remote_checksum == '2':113                result = dict(msg="no read permission on remote file, not transferring, ignored", file=source, changed=False)114            elif remote_checksum == '3':115                result = dict(msg="remote file is a directory, fetch cannot work on directories", file=source, changed=False)116            elif remote_checksum == '4':117                result = dict(msg="python isn't present on the system.  Unable to compute checksum", file=source, changed=False)118            return ReturnData(conn=conn, result=result)119        # calculate checksum for the local file120        local_checksum = utils.checksum(dest)121        if remote_checksum != local_checksum:122            # create the containing directories, if needed123            if not os.path.isdir(os.path.dirname(dest)):124                os.makedirs(os.path.dirname(dest))125            # fetch the file and check for changes126            if remote_data is None:127                conn.fetch_file(source, dest)128            else:129                f = open(dest, 'w')130                f.write(remote_data)131                f.close()132            new_checksum = utils.secure_hash(dest)133            # For backwards compatibility.  We'll return None on FIPS enabled134            # systems135            try:136                new_md5 = utils.md5(dest)137            except ValueError:138                new_md5 = None139            if validate_checksum and new_checksum != remote_checksum:140                result = dict(failed=True, md5sum=new_md5, msg="checksum mismatch", file=source, dest=dest, remote_md5sum=None, checksum=new_checksum, remote_checksum=remote_checksum)141                return ReturnData(conn=conn, result=result)142            result = dict(changed=True, md5sum=new_md5, dest=dest, remote_md5sum=None, checksum=new_checksum, remote_checksum=remote_checksum)143            return ReturnData(conn=conn, result=result)144        else:145            # For backwards compatibility.  We'll return None on FIPS enabled146            # systems147            try:148                local_md5 = utils.md5(dest)149            except ValueError:150                local_md5 = None151            result = dict(changed=False, md5sum=local_md5, file=source, dest=dest, checksum=local_checksum)...framevalidator.py
Source:framevalidator.py  
...33        if not validate_checksum:        34            logging.debug("Checksum validation is off; IMU data is not validated")35        else:36            logging.debug("Checksum validation is on; IMU data is validated")37    def set_validate_checksum(self, validate_checksum):38        """39        Small helper method to switch checksum validation on/off40        41        Arguments42        ---------43        validate_checksum -- boolean argument to set if checksum validation occurs44        """45        if not isinstance(validate_checksum, bool):46            raise ValueError("validate_checksum flag must be a boolean")47        self.validate_checksum = validate_checksum48        logging.debug("Checksum validation is now set to:{}".format(str(validate_checksum)))49    def validate(self, frame, chunk_id, byte_loc, run_id):50        """51        Method to validate is a provided frame from the camera stream is a...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
