Best Python code snippet using autotest_python
resources.py
Source:resources.py  
1# -*- coding: utf-8 -*-2"""3(c) 2012-2022 Martin Wendt; see https://github.com/mar10/pyftpsync4Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php5"""6import os7from datetime import datetime8from posixpath import join as join_url9from posixpath import normpath as normpath_url10from posixpath import relpath as relpath_url11from ftpsync.util import DEBUG_FLAGS, eps_compare, write12ENTRY_CLASSIFICATIONS = frozenset(13    ["existing", "unmodified", "modified", "new", "deleted"]14)15# PAIR_CLASSIFICATIONS = frozenset([16#     "conflict", "equal", "other"17#     ])18PAIR_OPERATIONS = frozenset(19    [20        "conflict",21        "copy_local",22        "copy_remote",23        "delete_local",24        "delete_remote",25        "equal",26        "need_compare",27    ]28)29operation_map = {30    # (local, remote) => operation31    ("missing", "missing"): None,  # Not allowed32    ("missing", "new"): "copy_remote",33    ("missing", "unmodified"): "copy_remote",34    ("missing", "modified"): "copy_remote",35    ("missing", "deleted"): True,  # Nothing to do (only update metadata)36    ("new", "missing"): "copy_local",37    ("new", "new"): "need_compare",38    ("new", "unmodified"): "need_compare",39    ("new", "modified"): "need_compare",40    ("new", "deleted"): "conflict",41    ("unmodified", "missing"): "copy_local",42    ("unmodified", "new"): "need_compare",43    ("unmodified", "unmodified"): "equal",44    ("unmodified", "modified"): "copy_remote",45    ("unmodified", "deleted"): "delete_local",46    ("modified", "missing"): "copy_local",47    ("modified", "new"): "need_compare",48    ("modified", "unmodified"): "copy_local",49    ("modified", "modified"): "conflict",50    ("modified", "deleted"): "conflict",51    ("deleted", "missing"): True,  # Nothing to do (only update metadata)52    ("deleted", "new"): "conflict",53    ("deleted", "unmodified"): "delete_remote",54    ("deleted", "modified"): "conflict",55    ("deleted", "deleted"): True,  # Nothing to do (only update metadata)56    # No meta data available: treat as 'unmodified' in general:57    ("existing", "missing"): "copy_local",58    ("missing", "existing"): "copy_remote",59    ("existing", "existing"): "need_compare",60}61# ===============================================================================62# EntryPair63# ===============================================================================64class EntryPair:65    """"""66    def __init__(self, local, remote):67        self.local = local68        self.remote = remote69        any_entry = local or remote70        assert any_entry71        if local and remote:72            assert local.name == remote.name73            assert local.get_rel_path() == remote.get_rel_path()74            assert local.is_dir() == remote.is_dir()75        #: str:76        self.name = any_entry.name77        #: str:78        self.rel_path = any_entry.get_rel_path()79        #: bool:80        self.is_dir = any_entry.is_dir()81        #: str:82        self.local_classification = None83        #: str:84        self.remote_classification = None85        #: str:86        self.operation = None87        #: str:88        self.re_class_reason = None89        # #: bool:90        # self.was_skipped = None91    def __str__(self):92        s = "<EntryPair({})>: ({}, {}) => {}".format(93            "[{}]".format(self.rel_path) if self.is_dir else self.rel_path,94            self.local_classification,95            self.remote_classification,96            self.operation,97        )98        return s99    @property100    def any_entry(self):101        """Return the local entry (or the remote entry if it is None)."""102        return self.local or self.remote103    def is_conflict(self):104        assert self.operation105        return self.operation == "conflict"106    def is_same_time(self):107        """Return True if local.mtime == remote.mtime."""108        return (109            self.local110            and self.remote111            and FileEntry._eps_compare(self.local.mtime, self.remote.mtime) == 0112        )113    def override_operation(self, operation, reason):114        """Re-Classify entry pair."""115        # prev_class = (self.local_classification, self.remote_classification)116        prev_op = self.operation117        assert operation != prev_op118        assert operation in PAIR_OPERATIONS119        if "classify" in DEBUG_FLAGS:120            write(121                "override_operation {} -> {} (reason: '{}')".format(122                    self, operation, reason123                ),124                debug=True,125            )126        self.operation = operation127        self.re_class_reason = reason128    def classify(self, peer_dir_meta):129        """Classify entry pair."""130        assert self.operation is None131        # Note: We pass False if the entry is not listed in the metadata.132        #       We pass None if we don't have metadata all.133        peer_entry_meta = peer_dir_meta.get(self.name, False) if peer_dir_meta else None134        if self.local:135            self.local.classify(peer_dir_meta)136            self.local_classification = self.local.classification137        elif peer_entry_meta:138            self.local_classification = "deleted"139        else:140            self.local_classification = "missing"141        if self.remote:142            self.remote.classify(peer_dir_meta)143            self.remote_classification = self.remote.classification144        elif peer_entry_meta:145            self.remote_classification = "deleted"146        else:147            self.remote_classification = "missing"148        c_pair = (self.local_classification, self.remote_classification)149        self.operation = operation_map.get(c_pair)150        if not self.operation:151            raise RuntimeError(152                "Undefined operation for pair classification {}".format(c_pair)153            )154        if "classify" in DEBUG_FLAGS:155            write(156                "Classified pair {}, meta={}".format(self, peer_entry_meta),157                debug=True,158            )159        # if not entry.meta:160        # assert self.classification in PAIR_CLASSIFICATIONS161        assert self.operation in PAIR_OPERATIONS162        return self.operation163# ===============================================================================164# _Resource165# ===============================================================================166class _Resource:167    """Common base class for files and directories."""168    def __init__(self, target, rel_path, name, size, mtime, unique):169        """170        Args:171            target:172            rel_path (str):173            name (str): base name174            size (int): file size in bytes175            mtime (float): modification time as UTC stamp176            uniqe (str): string177        """178        #: :class:`_Target`: Parent target object.179        self.target = target180        #: str: Path relative to :attr:`target`181        self.rel_path = rel_path182        #: str: File name.183        self.name = name184        #: int: Current file size185        self.size = size186        #: float: Current file modification time stamp187        #: (for FTP targets adjusted using metadata information).188        self.mtime = mtime189        # #: datetime: Converted version of :attr:`mtime`.190        # self.dt_modified = datetime.fromtimestamp(self.mtime)191        #: float: Modification time stamp (as reported by source FTP server).192        self.mtime_org = mtime193        # #: datetime: Converted version of :attr:`mtime_org`.194        # self.dt_modified_org = self.mtime_org195        #: str: Unique id of file/directory.196        self.unique = unique197        # #: dict: Additional metadata (set by target.get_dir()).198        # self.meta = None199        #: int: File size at the time of last sync operation200        self.ps_size = None201        #: float: File modification time stamp at the time of last sync operation202        self.ps_mtime = None203        #: float: Time stamp of last sync operation204        self.ps_utime = None205        #: str: (set by synchronizer._classify_entry()).206        self.classification = None207        #: bool: May be set to true by synchronizer208        self.was_deleted = None209    def __str__(self):210        dt_modified = datetime.fromtimestamp(self.mtime)211        path = os.path.join(self.rel_path, self.name)212        if self.is_dir():213            res = "{}([{}])".format(self.__class__.__name__, path)214        else:215            res = "{}('{}', size:{}, modified:{})".format(216                self.__class__.__name__,217                path,218                "{:,}".format(self.size) if self.size is not None else self.size,219                dt_modified,220            )221            # + " ## %s, %s" % (self.mtime, time.asctime(time.gmtime(self.mtime)))222        if self.classification:223            res += " => {}".format(self.classification)224        return res225    def as_string(self, other_resource=None):226        # dt = datetime.fromtimestamp(self.get_adjusted_mtime())227        dt = datetime.fromtimestamp(self.mtime)228        res = "{}, {:>8,} bytes".format(dt.strftime("%Y-%m-%d %H:%M:%S"), self.size)229        if other_resource:230            comp = []231            if self.mtime < other_resource.mtime:232                comp.append("older")233            elif self.mtime > other_resource.mtime:234                comp.append("newer")235            if self.size < other_resource.size:236                comp.append("smaller")237            elif self.size > other_resource.size:238                comp.append("larger")239            if comp:240                res += " ({})".format(", ".join(comp))241        return res242    def __eq__(self, other):243        raise NotImplementedError244    def get_rel_path(self):245        path = relpath_url(self.target.cur_dir, self.target.root_dir)246        return normpath_url(join_url(path, self.name))247    def is_file(self):248        return False249    def is_dir(self):250        return False251    def is_local(self):252        return self.target.is_local()253    def get_sync_info(self, key=None):254        return None255    def set_sync_info(self, local_file):256        raise NotImplementedError257    def classify(self, peer_dir_meta):258        """Classify this entry as 'new', 'unmodified', or 'modified'."""259        assert self.classification is None, "{}, {}".format(self, peer_dir_meta)260        peer_entry_meta = None261        if peer_dir_meta:262            # Metadata is generally available, so we can detect 'new' or 'modified'263            peer_entry_meta = peer_dir_meta.get(self.name, False)264            if self.is_dir():265                # Directories are considered 'unmodified' (would require deep266                # traversal to check otherwise)267                if peer_entry_meta:268                    self.classification = "unmodified"269                else:270                    self.classification = "new"271            elif peer_entry_meta:272                # File entries can be classified as modified/unmodified273                self.ps_size = peer_entry_meta.get("s")274                self.ps_mtime = peer_entry_meta.get("m")275                self.ps_utime = peer_entry_meta.get("u")276                if (277                    self.size == self.ps_size278                    and FileEntry._eps_compare(self.mtime, self.ps_mtime) == 0279                ):280                    self.classification = "unmodified"281                else:282                    self.classification = "modified"283            else:284                # A new file entry285                self.classification = "new"286        else:287            # No metadata available:288            if self.is_dir():289                # Directories are considered 'unmodified' (would require deep290                # traversal to check otherwise)291                self.classification = "unmodified"292            else:293                # That's all we know, but EntryPair.classify() may adjust this294                self.classification = "existing"295        if "classify" in DEBUG_FLAGS:296            write("Classified {}, meta={}".format(self, peer_entry_meta), debug=True)297        assert self.classification in ENTRY_CLASSIFICATIONS298        return self.classification299# ===============================================================================300# FileEntry301# ===============================================================================302class FileEntry(_Resource):303    # 2 seconds difference is considered equal.304    # mtime stamp resolution depends on filesystem: FAT32. 2 seconds, NTFS ms, OSX. 1 sec.305    EPS_TIME = 2.01306    #     EPS_TIME = 0.1307    def __init__(self, target, rel_path, name, size, mtime, unique):308        super().__init__(target, rel_path, name, size, mtime, unique)309    @staticmethod310    def _eps_compare(date_1, date_2):311        return eps_compare(date_1, date_2, FileEntry.EPS_TIME)312    def is_file(self):313        return True314    def __eq__(self, other):315        same_time = self._eps_compare(self.mtime, other.mtime) == 0316        return (317            other318            and other.__class__ == self.__class__319            and other.name == self.name320            and other.size == self.size321            and same_time322        )323    def __gt__(self, other):324        time_greater = self._eps_compare(self.mtime, other.mtime) > 0325        return (326            other327            and other.__class__ == self.__class__328            and other.name == self.name329            and time_greater330        )331    def get_sync_info(self, key=None):332        """Get mtime/size when this resource was last synchronized with remote."""333        return self.target.get_sync_info(self.name, key)334    def was_modified_since_last_sync(self):335        """Return True if this resource was modified since last sync.336        None is returned if we don't know (because of missing meta data).337        """338        info = self.get_sync_info()339        if not info:340            return None341        if self.size != info["s"]:342            return True343        if self.mtime > info["m"]:344            return True345        return False346# ===============================================================================347# DirectoryEntry348# ===============================================================================349class DirectoryEntry(_Resource):350    def __init__(self, target, rel_path, name, size, mtime, unique):351        super().__init__(target, rel_path, name, size, mtime, unique)352        # Directories don't have a size (that we could reasonably use for classification)353        self.size = 0354    def is_dir(self):...google.py
Source:google.py  
...9# Deployment - Interface10#11config_files = ["update_script.sh"]12machines_in_cluster = ['google_compute']13def copy_remote():14    """15    Copies a script needed on the server for executing the update of the repo!16    """17    source = os.path.join(util.OWN_PATH, 'cloud_specific_files', 'google','compute')18    for machine in machines_in_cluster:19        for file_0 in config_files :20            #scp update_script.sh google_compute:update_script.sh21            command = ["scp",os.path.join(source,file_0), machine+":"+file_0]22            try:23                subprocess.call(command)24            except :25                print("> > > Copy of Deploy_script failed...")26def apply_remote():27    """28    Get repository at remote and commits our changes to the repo.29    """    30    31    source = os.path.join(util.OWN_PATH, 'cloud_specific_files', 'google','compute')32    remote_repo = "/home/sys/environments/amos/repository"33    #Save old path34    old_path = os.getcwd()35    36    for machine in machines_in_cluster:37        try:38            39            os.chdir(source)40            41            #pull remote repository42            command = ["git","clone",machine+":"+remote_repo,os.path.join(source,"repo_google")]43            subprocess.call(command)44            45            os.chdir(os.path.join(source,"repo"))46            47            #clean our copy of repository48            command = ["rm","-r",".git"]49            subprocess.call(command)50            51            #copy configfiles of google_remote to our repo52            command = ["cp","-f","-r",os.path.join(source,"repo_google",".git"), os.path.join(source,"repo",".git")]53            subprocess.call(command)54            # add everything, I mean really everything in this repo to our next commit55            command = ["git","add","*"]56            subprocess.call(command)57            # Commit all changes58            command = ["git","commit","-a","-m\"I am log.\""]59            subprocess.call(command)60        61            command = ["git","push"]62            subprocess.call(command)63            64            os.chdir(source)65            66            # remove garbage repos67            command = ["rm","-r","repo"]68            subprocess.call(command)69            70            command = ["rm","-r","repo_google"]71            subprocess.call(command)72        except :73            print("> > > Deploy_script failed at " + machine + "...")74    75    os.chdir(old_path)76def deploy_remote():77    """78    Executes Remote script, which we copied via copy_remote79    """80    print('\n> Deploying on systems...')81    for machine in machines_in_cluster:   82        #ssh google_compute "sudo bash update_script.sh branch"83        command = ["ssh", machine,"sudo bash "+config_files[0]]84        try:85            subprocess.call(command)86        except :87            print("> > > Deploy of Deploy_script failed...")88# Implement this function89def deploy():90    util.copy_repo_to_specific('google/compute')91    92    print("Pulling Remote and add changes...")93    apply_remote()94    95    print("Copy scripts to remote ...")96    copy_remote()97    print("Deploy changes...")98    deploy_remote()99# Implement this function100def all_requirements_available():101    available = True102    OWN_FOLDER = os.path.join(util.OWN_PATH, 'cloud_specific_files', 'google','compute')103    for k in config_files :104        if not os.path.exists(os.path.join(OWN_FOLDER, k)):105            available = False...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
