Best Python code snippet using localstack_python
RDDOpSample.py
Source:RDDOpSample.py  
...32    for v in values:33        print("Value Side Effect: %s" % v)34class RDDOpSample():35    def doCollect(self, sc):36        rdd = sc.parallelize(range(1, 11))37        result = rdd.collect()38        print(result)39    def doCount(self, sc):40        rdd = sc.parallelize(range(1, 11))41        result = rdd.count()42        print(result)43    def doMap(self, sc):44        rdd1 = sc.parallelize(range(1, 6))45        rdd2 = rdd1.map(lambda v: v + 1)46        print(rdd2.collect())47    def doFlatMap(self, sc):48        rdd1 = sc.parallelize(["apple,orange", "grape,apple,mango", "blueberry,tomato,orange"])49        rdd2 = rdd1.flatMap(lambda s: s.split(","))50        print(rdd2.collect())51    def doMapPartitions(self, sc):52        rdd1 = sc.parallelize(range(1, 11), 3)53        rdd2 = rdd1.mapPartitions(increase)54        print(rdd2.collect())55    def doMapPartitionsWithIndex(self, sc):56        rdd1 = sc.parallelize(range(1, 11), 3)57        rdd2 = rdd1.mapPartitionsWithIndex(increaseWithIndex)58        print(rdd2.collect())59    def doMapValues(self, sc):60        rdd1 = sc.parallelize(["a", "b", "c"])61        rdd2 = rdd1.map(lambda v: (v, 1))62        rdd3 = rdd2.mapValues(lambda i: i + 1)63        print(rdd3.collect())64    def doFlatMapValues(self, sc):65        rdd1 = sc.parallelize([(1, "a,b"), (2, "a,c"), (1, "d,e")])66        rdd2 = rdd1.flatMapValues(lambda s: s.split(","))67        print(rdd2.collect())68    def doZip(self, sc):69        rdd1 = sc.parallelize(["a", "b", "c"])70        rdd2 = sc.parallelize([1, 2, 3])71        result = rdd1.zip(rdd2)72        print(result.collect())73    def doGroupBy(self, sc):74        rdd1 = sc.parallelize(range(1, 11))75        rdd2 = rdd1.groupBy(lambda v: "even" if v % 2 == 0 else "odd")76        for x in rdd2.collect():77            print(x[0], list(x[1]))78    def doGroupByKey(self, sc):79        rdd1 = sc.parallelize(["a", "b", "c", "b", "c"]).map(lambda v: (v, 1))80        rdd2 = rdd1.groupByKey()81        for x in rdd2.collect():82            print(x[0], list(x[1]))83    def doCogroup(self, sc):84        rdd1 = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k1", "v3")])85        rdd2 = sc.parallelize([("k1", "v4")])86        result = rdd1.cogroup(rdd2)87        for x in result.collect():88            print(x[0], list(x[1][0]), list(x[1][1]))89    def doDistinct(self, sc):90        rdd = sc.parallelize([1, 2, 3, 1, 2, 3, 1, 2, 3])91        result = rdd.distinct()92        print(result.collect())93    def doCartesian(self, sc):94        rdd1 = sc.parallelize([1, 2, 3])95        rdd2 = sc.parallelize(["a", "b", "c"])96        result = rdd1.cartesian(rdd2)97        print(result.collect())98    def doSubtract(self, sc):99        rdd1 = sc.parallelize(["a", "b", "c", "d", "e"])100        rdd2 = sc.parallelize(["d", "e"])101        result = rdd1.subtract(rdd2)102        print(result.collect())103    def doUnion(self, sc):104        rdd1 = sc.parallelize(["a", "b", "c"])105        rdd2 = sc.parallelize(["d", "e", "f"])106        result = rdd1.union(rdd2)107        print(result.collect())108    def doIntersection(self, sc):109        rdd1 = sc.parallelize(["a", "a", "b", "c"])110        rdd2 = sc.parallelize(["a", "a", "c", "c"])111        result = rdd1.intersection(rdd2)112        print(result.collect())113    def doJoin(self, sc):114        rdd1 = sc.parallelize(["a", "b", "c", "d", "e"]).map(lambda v: (v, 1))115        rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))116        result = rdd1.join(rdd2)117        print(result.collect())118    def doLeftOuterJoin(self, sc):119        rdd1 = sc.parallelize(["a", "b", "c"]).map(lambda v: (v, 1))120        rdd2 = sc.parallelize(["b", "c"]).map(lambda v: (v, 2))121        result1 = rdd1.leftOuterJoin(rdd2)122        result2 = rdd1.rightOuterJoin(rdd2)123        print("Left: %s" % result1.collect())124        print("Right: %s" % result2.collect())125    def doSubtractByKey(self, sc):126        rdd1 = sc.parallelize(["a", "b"]).map(lambda v: (v, 1))127        rdd2 = sc.parallelize(["b"]).map(lambda v: (v, 1))128        result = rdd1.subtractByKey(rdd2)129        print(result.collect())130    def doReduceByKey(self, sc):131        rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))132        result = rdd.reduceByKey(lambda v1, v2: v1 + v2)133        print(result.collect())134    def doFoldByKey(self, sc):135        rdd = sc.parallelize(["a", "b", "b"]).map(lambda v: (v, 1))136        result = rdd.foldByKey(0, lambda v1, v2: v1 + v2)137        print(result.collect())138    def doCombineByKey(self, sc):139        rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])140        result = rdd.combineByKey(lambda v: createCombiner(v), lambda c, v: mergeValue(c, v),141                                  lambda c1, c2: mergeCombiners(c1, c2))142        print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])143    def doAggregateByKey(self, sc):144        rdd = sc.parallelize([("Math", 100), ("Eng", 80), ("Math", 50), ("Eng", 70), ("Eng", 90)])145        result = rdd.aggregateByKey(Record(0, 0), lambda c, v: mergeValue(c, v), lambda c1, c2: mergeCombiners(c1, c2))146        print('Math', result.collectAsMap()['Math'], 'Eng', result.collectAsMap()['Eng'])147    def doPipe(self, sc):148        rdd = sc.parallelize(["1,2,3", "4,5,6", "7,8,9"])149        result = rdd.pipe("cut -f 1,3 -d ,")150        print(result.collect())151    def doCoalesceAndRepartition(self, sc):152        rdd1 = sc.parallelize(list(range(1, 11)), 10)153        rdd2 = rdd1.coalesce(5)154        rdd3 = rdd2.repartition(10)155        print("partition size: %d" % rdd1.getNumPartitions())156        print("partition size: %d" % rdd2.getNumPartitions())157        print("partition size: %d" % rdd3.getNumPartitions())158    def doRepartitionAndSortWithinPartitions(self, sc):159        data = [random.randrange(1, 100) for i in range(0, 10)]160        rdd1 = sc.parallelize(data).map(lambda v: (v, "-"))161        rdd2 = rdd1.repartitionAndSortWithinPartitions(3, lambda x: x)162        rdd2.foreachPartition(lambda values: print(list(values)))163    def doPartitionBy(self, sc):164        rdd1 = sc.parallelize([("apple", 1), ("mouse", 1), ("monitor", 1)], 5)165        rdd2 = rdd1.partitionBy(3)166        print("rdd1: %d, rdd2: %d" % (rdd1.getNumPartitions(), rdd2.getNumPartitions()))167    def doFilter(self, sc):168        rdd1 = sc.parallelize(range(1, 6))169        rdd2 = rdd1.filter(lambda i: i > 2)170        print(rdd2.collect())171    def doSortByKey(self, sc):172        rdd = sc.parallelize([("q", 1), ("z", 1), ("a", 1)])173        result = rdd.sortByKey()174        print(result.collect())175    def doKeysAndValues(self, sc):176        rdd = sc.parallelize([("k1", "v1"), ("k2", "v2"), ("k3", "v3")])177        print(rdd.keys().collect())178        print(rdd.values().collect())179    def doSample(self, sc):180        rdd = sc.parallelize(range(1, 101))181        result1 = rdd.sample(False, 0.5, 100)182        result2 = rdd.sample(True, 1.5, 100)183        print(result1.take(5))184        print(result2.take(5))185    def doFirst(self, sc):186        rdd = sc.parallelize([5, 4, 1])187        result = rdd.first()188        print(result)189    def doTake(self, sc):190        rdd = sc.parallelize(range(1, 100))191        result = rdd.take(5)192        print(result)193    def doTakeSample(self, sc):194        rdd = sc.parallelize(range(1, 100))195        result = rdd.takeSample(False, 20)196        print(len(result))197    def doCountByValue(self, sc):198        rdd = sc.parallelize([1, 1, 2, 3, 3])199        result = rdd.countByValue()200        for k, v in result.items():201            print(k, "->", v)202    def doReduce(self, sc):203        rdd = sc.parallelize(range(1, 11), 3)204        result = rdd.reduce(lambda v1, v2: v1 + v2)205        print(result)206    def doFold(self, sc):207        rdd = sc.parallelize(range(1, 11), 3)208        result = rdd.fold(0, lambda v1, v2: v1 + v2)209        print(result)210    def doAggregate(self, sc):211        rdd = sc.parallelize([100, 80, 75, 90, 95])212        result = rdd.aggregate(Record(0, 0), seqOp, combOp)213        print(result)214    def doSum(self, sc):215        rdd = sc.parallelize(range(1, 11))216        result = rdd.sum()217        print(result)218    def doForeach(self, sc):219        rdd = sc.parallelize(range(1, 11))220        result = rdd.foreach(lambda v: print("Value Side Effect: %s" % v))221    def doForeachPartition(self, sc):222        rdd = sc.parallelize(range(1, 11), 3)223        result = rdd.foreachPartition(sideEffect)224    def doDebugString(self, sc):225        rdd1 = sc.parallelize(range(1, 100), 10)226        rdd2 = rdd1.map(lambda v: v * 2)227        rdd3 = rdd2.map(lambda v: v + 1)228        rdd4 = rdd3.coalesce(2)229        print(rdd4.toDebugString())230    def doCache(self, sc):231        rdd = sc.parallelize(range(1, 100), 10)232        rdd.cache()233        rdd.persist(StorageLevel.MEMORY_ONLY)234        print(rdd.persist().is_cached)235    def doGetPartitions(self, sc):236        rdd = sc.parallelize(range(1, 100), 10)237        print(rdd.getNumPartitions())238    def saveAndLoadTextFile(self, sc):239        rdd = sc.parallelize(range(1, 1000), 3)240        codec = "org.apache.hadoop.io.compress.GzipCodec"241        # save242        rdd.saveAsTextFile("<path_to_save>/sub1")243        # save(gzip)244        rdd.saveAsTextFile("<path_to_save>/sub2", codec)245        # load246        rdd2 = sc.textFile("<path_to_save>/sub1")247        print(rdd2.take(10))248    def saveAndLoadObjectFile(self, sc):249        rdd = sc.parallelize(range(1, 1000), 3)250        # save251        # ìë ê²½ë¡ë ì¤ì  ì ì¥ ê²½ë¡ë¡ ë³ê²½íì¬ í
ì¤í¸252        rdd.saveAsPickleFile("data/sample/saveAsObjectFile/python")253        # load254        # ìë ê²½ë¡ë ì¤ì  ì ì¥ ê²½ë¡ë¡ ë³ê²½íì¬ í
ì¤í¸255        rdd2 = sc.pickleFile("data/sample/saveAsObjectFile/python")256        print(rdd2.take(10))257    def saveAndLoadSequenceFile(self, sc):258        # ìë ê²½ë¡ë ì¤ì  ì ì¥ ê²½ë¡ë¡ ë³ê²½íì¬ í
ì¤í¸259        path = "data/sample/saveAsSeqFile/python"260        outputFormatClass = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"261        inputFormatClass = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat"262        keyClass = "org.apache.hadoop.io.Text"263        valueClass = "org.apache.hadoop.io.IntWritable"264        conf = "org.apache.hadoop.conf.Configuration"265        rdd1 = sc.parallelize(["a", "b", "c", "b", "c"])266        rdd2 = rdd1.map(lambda x: (x, 1))267        # save268        rdd2.saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass, valueClass)269        # load270        rdd3 = sc.newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass)271        for k, v in rdd3.collect():272            print(k, v)273    def testBroadcaset(self, sc):274        bu = sc.broadcast(set(["u1", "u2"]))275        rdd = sc.parallelize(["u1", "u3", "u3", "u4", "u5", "u6"], 3)276        result = rdd.filter(lambda v: v in bu.value)277        print(result.collect())278if __name__ == "__main__":279    conf = SparkConf()280    conf.set("spark.driver.host", "127.0.0.1")281    sc = SparkContext(master="local[*]", appName="RDDOpSample", conf=conf)282    obj = RDDOpSample()283    # [ìì  ì¤í ë°©ë²] ìëìì ìíë ìì ì 주ìì ì ê±°íê³  ì¤í!!284    # ex) obj.testBroadcaset(sc)285    # obj.doCollect(sc)286    # obj.doCount(sc)287    # obj.doMap(sc)288    # obj.doFlatMap(sc)289    # obj.doMapPartitions(sc)...parallelize_repy.py
Source:parallelize_repy.py  
1# -*- coding: utf-8 -*-2### Automatically generated by repyhelper.py ### C:\Dropbox\uni\y1p2\dist\lab\demokit\parallelize.repy3### THIS FILE WILL BE OVERWRITTEN!4### DO NOT MAKE CHANGES HERE, INSTEAD EDIT THE ORIGINAL SOURCE FILE5###6### If changes to the src aren't propagating here, try manually deleting this file. 7### Deleting this file forces regeneration of a repy translation8from repyportability import *9import repyhelper10mycontext = repyhelper.get_shared_context()11callfunc = 'import'12callargs = []13""" 14Author: Justin Cappos15Module: A parallelization module.   It performs actions in parallel to make it16        easy for a user to call a function with a list of tasks.17Start date: November 11th, 200818This module is adapted from code in seash which had similar functionality.19NOTE (for the programmer using this module).   It's really important to 20write concurrency safe code for the functions they provide us.  It will not 21work to write:22def foo(...):23  mycontext['count'] = mycontext['count'] + 124YOU MUST PUT A LOCK AROUND SUCH ACCESSES.25"""26# I use this to get unique identifiers. 27repyhelper.translate_and_import('uniqueid.repy')28class ParallelizeError(Exception):29  """An error occurred when operating on a parallelized task"""30# This has information about all of the different parallel functions.31# The keys are unique integers and the entries look like this:32# {'abort':False, 'callfunc':callfunc, 'callargs':callargs,33# 'targetlist':targetlist, 'availabletargetpositions':positionlist,34# 'runninglist':runninglist, 'result':result}35#36# abort is used to determine if future events should be aborted.37# callfunc is the function to call38# callargs are extra arguments to pass to the function39# targetlist is the list of items to call the function with40# runninglist is used to track which events are executing41# result is a dictionary that contains information about completed function.42#    The format of result is:43#      {'exception':list of tuples with (target, exception string), 44#       'aborted':list of targets,45#       'returned':list of tuples with (target, return value)}46# 47parallelize_info_dict = {}48def parallelize_closefunction(parallelizehandle):49  """50   <Purpose>51      Clean up the state created after calling parallelize_initfunction.52   <Arguments>53      parallelizehandle:54         The handle returned by parallelize_initfunction55          56   <Exceptions>57      None58   <Side Effects>59      Will try to abort future functions if possible60   <Returns>61      True if the parallelizehandle was recognized or False if the handle is62      invalid or already closed.63  """64  # There is no sense trying to check then delete, since there may be a race 65  # with multiple calls to this function.66  try:67    del parallelize_info_dict[parallelizehandle]68  except KeyError:69    return False70  else:71    return True72    73def parallelize_abortfunction(parallelizehandle):74  """75   <Purpose>76      Cause pending events for a function to abort.   Events will finish 77      processing their current event.78   <Arguments>79      parallelizehandle:80         The handle returned by parallelize_initfunction81          82   <Exceptions>83      ParallelizeError is raised if the handle is unrecognized84   <Side Effects>85      None86   <Returns>87      True if the function was not previously aborting and is now, or False if 88      the function was already set to abort before the call.89  """90  91  try:92    if parallelize_info_dict[parallelizehandle]['abort'] == False:93      parallelize_info_dict[parallelizehandle]['abort'] = True94      return True95    else:96      return False97  except KeyError:98    raise ParallelizeError("Cannot abort the parallel execution of a non-existent handle:"+str(parallelizehandle))99def parallelize_isfunctionfinished(parallelizehandle):100  """101   <Purpose>102      Indicate if a function is finished103   <Arguments>104      parallelizehandle:105         The handle returned by parallelize_initfunction106          107   <Exceptions>108      ParallelizeError is raised if the handle is unrecognized109   <Side Effects>110      None111   <Returns>112      True if the function has finished, False if it is still has events running113  """114  115  try:116    if parallelize_info_dict[parallelizehandle]['runninglist']:117      return False118    else:119      return True120  except KeyError:121    raise ParallelizeError("Cannot get status for the parallel execution of a non-existent handle:"+str(parallelizehandle))122def parallelize_getresults(parallelizehandle):123  """124   <Purpose>125      Get information about a parallelized function126   <Arguments>127      parallelizehandle:128         The handle returned by parallelize_initfunction129          130   <Exceptions>131      ParallelizeError is raised if the handle is unrecognized132   <Side Effects>133      None134   <Returns>135      A dictionary with the results.   The format is136        {'exception':list of tuples with (target, exception string), 137         'aborted':list of targets, 'returned':list of tuples with (target, 138         return value)}139  """140  141  try:142    # I copy so that the user doesn't have to deal with the fact I may still143    # be modifying it144    return parallelize_info_dict[parallelizehandle]['result'].copy()145  except KeyError:146    raise ParallelizeError("Cannot get results for the parallel execution of a non-existent handle:"+str(parallelizehandle))147      148      149def parallelize_initfunction(targetlist, callerfunc,concurrentevents=5, *extrafuncargs):150  """151   <Purpose>152      Call a function with each argument in a list in parallel153   <Arguments>154      targetlist:155          The list of arguments the function should be called with.   Each156          argument is passed once to the function.   Items may appear in the157          list multiple times158      callerfunc:159          The function to call160 161      concurrentevents:162          The number of events to issue concurrently (default 5).   No more 163          than len(targetlist) events will be concurrently started.164      extrafuncargs:165          Extra arguments the function should be called with (every function166          is passed the same extra args).167   <Exceptions>168      ParallelizeError is raised if there isn't at least one free event.   169      However, if there aren't at least concurrentevents number of free events,170      this is not an error (instead this is reflected in parallelize_getstatus)171      in the status information.172   <Side Effects>173      Starts events, etc.174   <Returns>175      A handle used for status information, etc.176  """177  parallelizehandle = uniqueid_getid()178  # set up the dict locally one line at a time to avoid a ginormous line179  handleinfo = {}180  handleinfo['abort'] = False181  handleinfo['callfunc'] = callerfunc182  handleinfo['callargs'] = extrafuncargs183  # make a copy of target list because 184  handleinfo['targetlist'] = targetlist[:]185  handleinfo['availabletargetpositions'] = range(len(handleinfo['targetlist']))186  handleinfo['result'] = {'exception':[],'returned':[],'aborted':[]}187  handleinfo['runninglist'] = []188  189  parallelize_info_dict[parallelizehandle] = handleinfo190  # don't start more threads than there are targets (duh!)191  threads_to_start = min(concurrentevents, len(handleinfo['targetlist']))192  for workercount in range(threads_to_start):193    # we need to append the workercount here because we can't return until 194    # this is scheduled without having race conditions195    parallelize_info_dict[parallelizehandle]['runninglist'].append(workercount)196    try:197      settimer(0.0, parallelize_execute_function, (parallelizehandle,workercount))198    except:199      # If I'm out of resources, stop200      # remove this worker (they didn't start)201      parallelize_info_dict[parallelizehandle]['runninglist'].remove(workercount)202      if not parallelize_info_dict[parallelizehandle]['runninglist']:203        parallelize_closefunction(parallelizehandle)204        raise Exception, "No events available!"205      break206  207  return parallelizehandle208    209def parallelize_execute_function(handle, myid):210  # This is internal only.   It's used to execute the user function...211  # No matter what, an exception in me should not propagate up!   Otherwise,212  # we might result in the program's termination!213  try:214    while True:215      # separate this from below functionality to minimize scope of try block216      thetargetlist = parallelize_info_dict[handle]['targetlist']217      try:218        mytarget = thetargetlist.pop()219      except IndexError:220        # all items are gone, let's return221        return222      # if they want us to abort, put this in the aborted list223      if parallelize_info_dict[handle]['abort']:224        parallelize_info_dict[handle]['result']['aborted'].append(mytarget)225      else:226        # otherwise process this normally227        # limit the scope of the below try block...228        callfunc = parallelize_info_dict[handle]['callfunc']229        callargs = parallelize_info_dict[handle]['callargs']230        try:231          retvalue = callfunc(mytarget,*callargs)232        except Exception, e:233          # always log on error.   We need to report what happened234          parallelize_info_dict[handle]['result']['exception'].append((mytarget,str(e)))235        else:236          # success, add it to the dict...237          parallelize_info_dict[handle]['result']['returned'].append((mytarget,retvalue))238  except KeyError:239    # A KeyError is normal if they've closed the handle240    return241  except Exception, e:242    print 'Internal Error: Exception in parallelize_execute_function',e243  finally:244    # remove my entry from the list of running worker threads...245    try:246      parallelize_info_dict[handle]['runninglist'].remove(myid)247    except (ValueError, KeyError):248      pass249    250    ...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
