How to use _get_new_files method in autotest

Best Python code snippet using autotest_python Github


Full Screen

...12 Abstract Base Class for the source classes.13 """14 def __init__(self, database):15 self.database = database16 def _get_new_files(self, files):17 """18 Return a copy of "files" after filtering out known old files19 from "files".20 """21 old_files = self.database.get_dictionary()22 return dict(filter(lambda x: x[0] not in old_files, files.iteritems()))23 def get_new_files(self):24 raise NotImplementedError('get_new_files not implemented')25 def store_files(self, files):26 self.database.merge_dictionary(files)27class rsync_source(source):28 _cmd_template = '/usr/bin/rsync -rltz --no-motd %s %s/%s'29 def __init__(self, database, prefix, excludes=[]):30 super(rsync_source, self).__init__(database)31 self.prefix = prefix32 self.exclude = ' '.join(['--exclude "' + x + '"' for x in excludes])33 self.sources = []34 def _parse_output(self, output, prefix):35 """36 Parse rsync's "ls -l" style output and return a dictionary of37 database.item indexed by the "name" field.38 """39 regex = re.compile(40 '-[rwx-]{9} +(\d+) (\d{4}/\d\d/\d\d \d\d:\d\d:\d\d) (.*)')41 res = {}42 for line in output.splitlines():43 match = regex.match(line)44 if match:45 groups = match.groups()46 timestamp = time.mktime(time.strptime(groups[1],47 '%Y/%m/%d %H:%M:%S'))48 if prefix:49 fname = '%s/%s' % (prefix, groups[2])50 else:51 fname = groups[2]52 item = database.item(fname, int(groups[0]), int(timestamp))53 res[] = item54 return res55 def add_path(self, src, prefix=''):56 """57 Add paths to synchronize from the source.58 """59 self.sources.append((src, prefix))60 def get_new_files(self):61 """62 Implement source.get_new_files by using rsync listing feature.63 """64 files = {}65 for src, prefix in self.sources:66 output = utils.system_output(self._cmd_template %67 (self.exclude, self.prefix, src))68 files.update(self._parse_output(output, prefix))69 return self._get_new_files(files)70class _ahref_parser(HTMLParser.HTMLParser):71 def reset(self, url=None, pattern=None):72 HTMLParser.HTMLParser.reset(self)73 self.url = url74 self.pattern = pattern75 self.links = []76 def handle_starttag(self, tag, attrs):77 if tag == 'a':78 for name, value in attrs:79 if name == 'href':80 # compose absolute URL if relative "href" found81 url = urlparse.urljoin(self.url, value)82 if self.pattern.match(url):83 self.links.append(url)84 def get_ahref_list(self, url, pattern):85 self.reset(url, pattern)86 self.feed(urllib2.urlopen(url).read())87 self.close()88 return self.links89class url_source(source):90 """91 A simple URL based source that parses HTML to find references to92 kernel files.93 """94 _extension_pattern = re.compile(r'.*\.[^/.]+$')95 def __init__(self, database, prefix):96 super(url_source, self).__init__(database)97 self.prefix = prefix98 self.urls = []99 def add_url(self, url, pattern):100 """101 Add a URL path to a HTML document with links to kernel files.102 :param url: URL path to a HTML file with links to kernel files103 (can be either an absolute URL or one relative to self.prefix)104 :param pattern: regex pattern to filter kernel files links out of105 all othe links found in the HTML document106 """107 # if it does not have an extension then it's a directory and it needs108 # a trailing '/'. NOTE: there are some false positives such as109 # directories named "v2.6" where ".6" will be assumed to be extension.110 # In order for these to work the caller must provide a trailing /111 if url[-1:] != '/' and not self._extension_pattern.match(url):112 url = url + '/'113 self.urls.append((url, re.compile(pattern)))114 @staticmethod115 def _get_item(url):116 """117 Get a database.item object by fetching relevant HTTP information118 from the document pointed to by the given url.119 """120 try:121 info = urllib2.urlopen(url).info()122 except IOError, err:123 # file is referenced but does not exist124 print 'WARNING: %s' % err125 return None126 size = info.get('content-length')127 if size:128 size = int(size)129 else:130 size = -1131 timestamp = int(time.mktime(info.getdate('date')))132 if not timestamp:133 timestamp = 0134 return database.item(url, size, timestamp)135 def get_new_files(self):136 parser = _ahref_parser()137 files = {}138 for url, pattern in self.urls:139 links = parser.get_ahref_list(urlparse.urljoin(self.prefix, url),140 pattern)141 for link in links:142 item = self._get_item(link)143 if item:144 files[] = item145 return self._get_new_files(files)146class directory_source(source):147 """148 Source that finds kernel files by listing the contents of a directory.149 """150 def __init__(self, database, path):151 """152 Initialize a directory_source instance.153 :param database: Persistent database with known kernels information.154 :param path: Path to the directory with the kernel files found by155 this source.156 """157 super(directory_source, self).__init__(database)158 self._path = path159 def get_new_files(self, _stat_func=os.stat):160 """161 Main function, see source.get_new_files().162 :param _stat_func: Used for unit testing, if we stub os.stat in the163 unit test then unit test failures get reported confusingly164 because the unit test framework tries to stat() the unit test165 file.166 """167 all_files = {}168 for filename in os.listdir(self._path):169 full_filename = os.path.join(self._path, filename)170 try:171 stat_data = _stat_func(full_filename)172 except OSError:173 # File might have been removed/renamed since we listed the174 # directory so skip it.175 continue176 item = database.item(full_filename, stat_data.st_size,177 int(stat_data.st_mtime))178 all_files[filename] = item...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:


You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?