Best Python code snippet using autotest_python
dataPipeline.py
Source:dataPipeline.py  
1import json2import urllib3import urllib24import requests5import zipfile6import csv7import sys8import difflib9import string10import time11import re12import tmdbsimple as tmdb  # pip install tmdbsimple13import rtsimple as rt  # pip install rtsimple14from pymongo import MongoClient15from functools import partial16# Set some constants17omdbFile     = "omdb/omdbMovies.txt"18rawOmdbFile  = "raw_omdb.json"19rawTmdbFile  = "raw_tmdb.json"20rawRtFile    = "raw_rt.json"21movieFile    = "movie_data.json"22meteorPath   = "../meteorApp/private/movie_data.json"23uri          = 'mongodb://localhost/appdb'24tmdb.API_KEY = '2a50430216cd531275198faae3531bb4'25tmdbImages = tmdb.Configuration().info()['images']26rt.API_KEY = 'dbbykgj6pbm3wnfrnvhdbdym'27# readcsv function28def readcsv(path):29    f = open(path, "rb")30    rawdata = csv.reader(f, delimiter="\t")31    data = []32    keys = rawdata.next()33    for row in rawdata:34        item = dict()35        for i in range(len(keys)):36            if len(row) > i:37                item[keys[i]] = row[i].decode("utf-8", "ignore").strip()38            else:39                item[keys[i]] = None40        data.append(item)41    return dat42def loadFromTmdb(filePath, hardReset, omdbMovies):43    44    if not hardReset:45        tmdbMovies = json.load(open(filePath, 'r'))46    else:47        tmdbMovies = dict()48        49    50    i = 051    for m in omdbMovies:52        i += 153   54        imdbID = m["ImdbID"]55        if (imdbID in tmdbMovies): continue56        print("tmdb" + str(i))57        sys.stdout.flush()58        data = keywords = videos = None59        try:60            data = tmdb.Movies("tt" + imdbID).info()61            keywords = tmdb.Movies(data['id']).keywords()['keywords']62            videos = tmdb.Movies(data['id']).videos()['results']63            # titles = tmdb.Movies(data['id']).alternative_titles()64            # cast = tmdb.Movies(fromTmdb['id']).credits()['cast']65            # crew = tmdb.Movies(fromTmdb['id']).credits()['crew']66            # images = tmdb.Movies(fromTmdb['id']).images()67        except requests.HTTPError as e:68            if e.message == '404 Client Error: Not Found':69                tmdbMovies[imdbID] = False70                continue71            else:72                print('HTTP Error')73                return74        tmdbMovies[imdbID] = dict({75            "Adult":      data["adult"] or None,76            "Backdrop":   tmdbImages['base_url'] + 'original' + str(data['backdrop_path']) if data["backdrop_path"] else None,77            "Budget":     data["budget"] or None,78            "Genres":     [g["name"] for g in data["genres"]],79            "Homepage":   data["homepage"] or None,80            "Languages":  [g["name"] for g in data["spoken_languages"]],81            "Tagline":    data["tagline"] or None,82            "Title":      data["title"] or None,83            "Poster":     tmdbImages['base_url'] + 'w185' + data["poster_path"] if data["poster_path"] else None,84                          # can use 'original', u'w92', 'w154', 'w185', 'w342', 'w500', 'w780'85            "Studios":    [g["name"] for g in data["production_companies"]],86            "Overview":   data["overview"] or None,87            "Popularity": data["popularity"] or None,88            "Released":   data["release_date"] or None,89            "Revenue":    data["revenue"] or None,90            "Runtime":    data["runtime"] or None,91            "TmdbRating": data["vote_average"] or None,92            "TmdbVotes":  data["vote_count"] or None,93            "Keywords":   [k["name"] for k in keywords],94            "Videos":     [dict({ "key": v["key"], "type": v["type"], "site": v["site"] }) for v in videos]95        });96    return tmdbMovies97def saveTmdb(tmdbMovies, filePath):98    json.dump(tmdbMovies, open(filePath, 'w'), indent=4)99def loadFromRt(filePath, hardReset, omdbMovies):100    101    if not hardReset:102        rtMovies = json.load(open(filePath, 'r'))103    else:104        rtMovies = dict()105        106    i = 0107    errorCount = 0108    for m in omdbMovies:109        i += 1110        111        if errorCount >= 10:112            return rtMovies113        114        imdbID = m["ImdbID"]115        if (imdbID in rtMovies and rtMovies[imdbID] != False): continue116        print("Rotten Tomatoes " + str(i))117        sys.stdout.flush()118        119        try:120            data = rt.Alias().movie(type='imdb', id=imdbID)121        except requests.HTTPError as e:122            rtMovies[imdbID] = False123            print("HTTPError")124            errorCount = errorCount + 1125            continue126        rtID = data['id'];127        128        # videos = rt.Movies(rtID).clips()['clips']129        # reviews = rt.Movies(fromRt['id']).reviews()130        # cast = rt.Movies(fromRt['id']).cast()['cast']131        132        rtMovies[imdbID] = dict({133            "Cast":        [dict({ "name": c["name"], "character": c["characters"] if "characters" in c else None })134                            for c in data["abridged_cast"]],135            "Directors":   [g["name"] for g in data["abridged_directors"]] if "abridged_directors" in data else [],136            "Genres":      data["genres"] or [],137            "RtID":        data["id"],138            "Rating":      data["mpaa_rating"].replace("N/A", "") or None,139            "Consensus":   (data["critics_consensus"] or None) if "critics_consensus" in data else None,140            "Posters":     data["posters"],141            "LinkRt":      data['links']['alternate'] if 'alternate' in data['links'] else None,142            "RtAudience":  data["ratings"]["audience_score"] if data["ratings"]["audience_score"] > 0 else None,143            "RtCritics":   data["ratings"]["critics_score"] if data["ratings"]["critics_score"] > 0 else None,144            "ReleasedDVD": data["release_dates"]["dvd"] if "dvd" in data["release_dates"] else None,145            "Released":    data["release_dates"]["theater"] if "theater" in data["release_dates"] else None,146            "Runtime":     data["runtime"] or None,147            "Studio":      data["studio"] if "studio" in data else None,148            "Synopsis":    data["synopsis"] or None,149            "Title":       data["title"] or None,150            "Year":        data["year"] or None151        });152    rtMovies['0088199'] = False #automagic153    return rtMovies154def saveRt(fromRt, filePath):155    json.dump(fromRt, open(filePath, 'w'), indent=4)156def sanitize(fromOmdb, fromTmdb, fromRt):157    movies = []158    for m in fromOmdb:159        imdbID = m['ImdbID']160        if imdbID in fromTmdb and imdbID in fromRt and fromRt[imdbID] and fromTmdb[imdbID]:161            movies.append(formatMovie(imdbID, m, fromRt[imdbID], fromTmdb[imdbID]))162    return movies        163def saveMovies(movies, filePath, meteorPath):164    json.dump(movies, open(filePath, 'w'), indent=4)165    json.dump(movies, open(meteorPath, 'w'), indent=4)166    167def updateMovies(movies):168    #Don't run this, use Meteor instead!169    client = MongoClient(uri)170    171    for movie in movies:172        cursor = client.appdb.movies.find({"link_imdb": movie['link_imdb']})173        if cursor.count() == 0:174            movie['firstInserted'] = time.time()175            movie['lastUpdated'] = time.time()176            client.appdb.movies.insert(movie)177        else:178        	continue179            #movie['lastUpdated'] = time.time()180            #client.appdb.movies.replace_one({"link_imdb" : movie['link_imdb']}, movie)181        182def formatMovie(imdbID, fromOmdb, fromRt, fromTmdb):183    result = dict()184    result['title'] = fromTmdb['Title']185    result['plot'] = fromTmdb['Overview'] or fromOmdb['Plot'] or fromRt['Synopsis']186    result['keywords'] = fromTmdb['Keywords']187    result['runtime'] = fromTmdb['Runtime'] or fromRt['Runtime']188    result['budget'] = fromTmdb['Budget']189    result['revenue'] = fromTmdb['Revenue']190    result['tagline'] = fromTmdb['Tagline']191    result['year'] = fromOmdb['Year']192    result['released'] = fromTmdb['Released']193    result['languages'] = fromTmdb['Languages']194    result['country'] = fromOmdb['Country']195    fromTmdb['Studios'].append(None)196    result['studio'] = fromRt['Studio'] or fromTmdb['Studios'][0]  # TODO197    result['cast'] = fromOmdb['Cast'] or fromOmdb['Cast']198    result['directors'] = fromOmdb['Directors'] or fromOmdb['Directors']199    result['homepage'] = fromTmdb['Homepage']200    201    result['awards'] = fromOmdb['Awards']202    result['rating'] = fromRt['Rating'] or fromOmdb['Rating']203    result['score_rtaudience'] = fromRt['RtAudience']204    result['score_rtcritics'] = fromRt['RtCritics']205    result['score_metacritic'] = fromOmdb['Metacritic']206    result['score_imdb'] = fromOmdb['ImdbRating']207    result['votes_imdb'] = fromOmdb['ImdbVotes']208    result['score'] = (fromRt['RtCritics'] or 0) + (fromRt['RtAudience'] or 0) + (fromOmdb['ImdbRating'] or 0)*10209    result['poster'] = fromOmdb['Poster'] or fromTmdb['Poster']210    result['background'] = fromTmdb['Backdrop']211    result['link_rt'] = fromRt['LinkRt']212    result['link_imdb'] = 'tt' + imdbID213    # Genres214    genres = set(fromTmdb['Genres'])215    if 'Kids' in genres:216        genres.remove('Kids')217        genres.add('Family')218    if 'TV Movie' in genres:219        genres.remove('TV Movie')220    result['genres'] = list(genres)221    # Trailer222    trailers = [a for a in fromTmdb['Videos'] if a['type'] == 'Trailer']223    if len(trailers) and trailers[-1]['site'] == 'YouTube':224        result['trailer_youtube'] = trailers[-1]['key']225    else :226        result['trailer_youtube'] = None227    # Search String228    result['search_str'] = (result['title'] + ' ' + ' '.join(result['cast']) + ' '.join(result['directors']) + ' '.join(result['keywords'])).lower()229        230    # Streaming231    result['streaming'] = dict()232    233    return result234    235# We first get the latest data from OMDb236def loadData(hardReset, redownloadOmdb):237    fromOmdb = json.load(open(rawOmdbFile, 'r'))238    239    # OMDB240    print("Downloading the movies from OMDB...")241    if redownloadOmdb:242    	downloadOmdbData()243    print("OMDB movies downloaded and extracted to "+omdbFile+".")244    print("Loading and filtering the movies from OMDB...")245    fromOmdb = loadFromOmdb(omdbFile)246    print(len(fromOmdb), "Movies loaded from OMDB.")247    print("Saving the OMDB filtered movies to "+rawOmdbFile+" ...")248    saveOmdb(fromOmdb, rawOmdbFile)249    print("OMDB movies saved.")250    # Tmdb251    print("Loading TMDB movies, hardReset:"+str(hardReset)+" ...")252    fromTmdb = loadFromTmdb(rawTmdbFile, hardReset, fromOmdb)253    print(len(fromTmdb), " movies loaded from TMDB.")254    print("Saving the TMDB movies to "+rawTmdbFile+" ...")255    saveTmdb(fromTmdb, rawTmdbFile)256    print("TMDB Movies saved.")257    # Rt258    print("Loading Rotten Tomatoes movies, hardReset:"+str(hardReset)+" ...")259    fromRt = loadFromRt(rawRtFile, hardReset, fromOmdb)260    print(len(fromRt), " movies loaded from Rotten Tomatoes.")261    print("Saving the Rotten Tomatoes movies to "+rawRtFile+" ...")262    saveRt(fromRt, rawRtFile)263    print("Rotten Tomatoes movies saved.")264    265    # Netflix266    267    # Amazon268    # Saving and updating the movies269    print("Combining data from OMDB, TMDB, RT...")270    movies = sanitize(fromOmdb, fromTmdb, fromRt)271    print(len(movies)," movies combined.")272    print("Saving movies to "+movieFile+" and "+meteorPath+" ...")273    saveMovies(movies, movieFile, meteorPath)274    print("Movies saved.")275    print("Updating the mongo db with the latest information...")276    updateMovies(movies)277    print("Mongodb updated.")278################# Start of the main programm ##################279if __name__ == "__main__":280	parser = argparse.ArgumentParser(description='Refresh data omdb imdb rotten tomatoes')281	parser.add_argument('--uri', default='mongodb://localhost/appdb',282	                   help='The uri of the mongo connection')283	parser.add_argument('--hardReset', type=int, help='Refresh completely the files', default=0)284	args = parser.parse_args()285	hardReset = args.hardReset...ed.py
Source:ed.py  
...19                self.printInfo(_("You can't set negative limits for %s") % pqkey, "error")20        if reset :21            pqentry.reset()22        if hardreset :    23            pqentry.hardreset()24        if suffix == "User" :25            if used :26                pqentry.setUsage(used)27    28    def main(self, names, options) :29        """Edit user or group quotas."""30        names = self.sanitizeNames(options, names)31        suffix = (options["groups"] and "Group") or "User"        32        printernames = options["printer"].split(",")33            34        if not options["list"] :35            percent = Percent(self)36            percent.display("%s..." % _("Extracting datas"))37        printers = self.storage.getMatchingPrinters(options["printer"])...manage_repository_gitpull.py
Source:manage_repository_gitpull.py  
1from Products.zms import standard2import os3def manage_repository_gitpull(self, request=None):4	printed = []5	request = self.REQUEST6	RESPONSE =  request.RESPONSE7	btn = request.form.get('btn')8	came_from = request.get('came_from',request['HTTP_REFERER'])9	if came_from.find('?') > 0:10		came_from = came_from[:came_from.find('?')]11	base_path = self.get_conf_basepath()12	branch = self.getConfProperty('ZMSRepository.git.server.branch','master').replace('"','').replace(';','')13	hardreset_cmd = 'git reset --hard origin/%s'%(branch)14	printed.append('<!DOCTYPE html>')15	printed.append('<html lang="en">')16	printed.append(self.zmi_html_head(self,request))17	printed.append('<body class="repository_manager_main %s">'%(' '.join(['zmi',request['lang'],self.meta_id])))18	# printed.append(self.zmi_body_header(self,request,options=[{'action':'#','label':'%s...'%self.getZMILangStr('BTN_GITPULL')}]))19	printed.append(self.zmi_body_header(self,request,options=self.repository_manager.customize_manage_options()))20	printed.append('<div id="zmi-tab">')21	printed.append(self.zmi_breadcrumbs(self,request,extra=[self.manage_sub_options()[0]]))22	printed.append('<div class="card">')23	printed.append('<form class="form-horizontal" method="post" enctype="multipart/form-data">')24	printed.append('<input type="hidden" name="lang" value="%s"/>'%request['lang'])25	printed.append('<input type="hidden" name="came_from" value="%s"/>'%came_from)26	printed.append('<legend>%s, Current Branch %s</legend>'%(self.getZMILangStr('BTN_GITPULL'),branch))27	# --- PULL. +++IMPORTANT+++: Use SSH/cert and git credential manager28	# ---------------------------------29	if btn=='BTN_GITPULL':30		message = []31		### update from repository32		# userid = self.getConfProperty('ZMSRepository.git.server.userid')33		# password = self.getConfProperty('ZMSRepository.git.server.password') # TODO: decrypt34		# url = self.getConfProperty('ZMSRepository.git.server.url')35		if len([x for x in request['AUTHENTICATED_USER'].getRolesInContext(self) if x in ['Manager','ZMSAdminstrator']]) > 0:36			os.chdir(base_path)37			if request.get('hardreset'):38				result = os.system(hardreset_cmd)39				message.append('<code class="d-block">%s [%s]</code>'%(hardreset_cmd, str(result)))40			command = 'git checkout %s;git pull'%(branch)41			if request.get('revision')!='HEAD' and request.get('revision') is not None:42				command = 'git checkout %s'%(request.get('revision').replace('"','').replace(';',''))43			result = os.system(command)44			message.append('<code class="d-block mb-3">%s [%s]</code>'%(command, str(result)))45			### import from working-copy46			# success = self.updateChanges(REQUEST.get('ids',[]),btn=='override')47			# message.append(self.getZMILangStr('MSG_IMPORTED')%('<em>%s</em>'%' '.join(success)))48		else:49			message.append('Error: To execute this function a user role Manager or ZMSAdministrator is needed.')50		### return with message51		request.response.redirect(self.url_append_params('manage_main',{'lang':request['lang'],'manage_tabs_message':''.join(message)}))52	# --- Cancel.53	# ---------------------------------54	elif btn=='BTN_CANCEL':55		request.response.redirect(self.url_append_params(came_from,{'lang':request['lang']}))56	# --- Display initial form.57	# -------------------------58	else:59		printed.append('<div class="card-body">')60		printed.append('<div class="form-group row">')61		printed.append('<label for="revision" class="col-sm-2 control-label mandatory">Revision</label>')62		printed.append('<div class="col-sm-10"><input class="form-control" name="revision" type="text" size="25" value="HEAD" title="Default value HEAD pulls the latest revision. Please, enter the hexadecimal ID for checking out a specific revision." placeholder="Enter HEAD or Revision-ID"></div>')63		printed.append('</div><!-- .form-group -->')64		printed.append('<div class="form-group row">')65		printed.append('<label for="hardreset" class="col-sm-2 control-label mandatory">Use Hard Reset</label>')66		printed.append('<div class="col-sm-10"><input type="checkbox" name="hardreset" value="hardreset" title="git reset --hard origin/%s" /></div>'%(branch))67		printed.append('</div><!-- .form-group -->')68		printed.append('<div class="form-group">')69		printed.append('<div class="controls save">')70		printed.append('<button type="submit" name="btn" class="btn btn-primary" value="BTN_GITPULL">%s</button>'%(self.getZMILangStr('BTN_GITPULL')))71		printed.append('<button type="submit" name="btn" class="btn btn-secondary" value="BTN_CANCEL">%s</button>'%(self.getZMILangStr('BTN_CANCEL')))72		printed.append('</div>')73		printed.append('</div><!-- .form-group -->')74		# printed.append(self.manage_main_diff(self,request))75		printed.append('</div><!-- .card-body -->')76	# ---------------------------------77	printed.append('</form><!-- .form-horizontal -->')78	printed.append('</div><!-- .card -->')79	printed.append('</div><!-- #zmi-tab -->')80	printed.append(self.zmi_body_footer(self,request))81	printed.append('<script>$ZMI.registerReady(function(){ $(\'#tabs_items li a\').removeClass(\'active\');$(\'#tabs_items li[data-action*=\"repository_manager\"] a\').addClass(\'active\'); })</script>')82	printed.append('</body>')83	printed.append('</html>')...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
