Best Python code snippet using robotframework-pageobjects_python
test_highlevel_open_tcp_stream.py
Source:test_highlevel_open_tcp_stream.py  
...217            assert ip in self.ip_dict218            if socket is not succeeded:219                assert socket.closed220            assert socket.port == self.port221async def run_scenario(222    # The port to connect to223    port,224    # A list of225    #  (ip, delay, result)226    # tuples, where delay is in seconds and result is "success" or "error"227    # The ip's will be returned from getaddrinfo in this order, and then228    # connect() calls to them will have the given result.229    ip_list,230    *,231    # If False, AF_INET4/6 sockets error out on creation, before connect is232    # even called.233    ipv4_supported=True,234    ipv6_supported=True,235    # Normally, we return (winning_sock, scenario object)236    # If this is True, we require there to be an exception, and return237    #   (exception, scenario object)238    expect_error=(),239    **kwargs,240):241    supported_families = set()242    if ipv4_supported:243        supported_families.add(trio.socket.AF_INET)244    if ipv6_supported:245        supported_families.add(trio.socket.AF_INET6)246    scenario = Scenario(port, ip_list, supported_families)247    trio.socket.set_custom_hostname_resolver(scenario)248    trio.socket.set_custom_socket_factory(scenario)249    try:250        stream = await open_tcp_stream("test.example.com", port, **kwargs)251        assert expect_error == ()252        scenario.check(stream.socket)253        return (stream.socket, scenario)254    except AssertionError:  # pragma: no cover255        raise256    except expect_error as exc:257        scenario.check(None)258        return (exc, scenario)259async def test_one_host_quick_success(autojump_clock):260    sock, scenario = await run_scenario(80, [("1.2.3.4", 0.123, "success")])261    assert sock.ip == "1.2.3.4"262    assert trio.current_time() == 0.123263async def test_one_host_slow_success(autojump_clock):264    sock, scenario = await run_scenario(81, [("1.2.3.4", 100, "success")])265    assert sock.ip == "1.2.3.4"266    assert trio.current_time() == 100267async def test_one_host_quick_fail(autojump_clock):268    exc, scenario = await run_scenario(269        82, [("1.2.3.4", 0.123, "error")], expect_error=OSError270    )271    assert isinstance(exc, OSError)272    assert trio.current_time() == 0.123273async def test_one_host_slow_fail(autojump_clock):274    exc, scenario = await run_scenario(275        83, [("1.2.3.4", 100, "error")], expect_error=OSError276    )277    assert isinstance(exc, OSError)278    assert trio.current_time() == 100279async def test_one_host_failed_after_connect(autojump_clock):280    exc, scenario = await run_scenario(281        83, [("1.2.3.4", 1, "postconnect_fail")], expect_error=KeyboardInterrupt282    )283    assert isinstance(exc, KeyboardInterrupt)284# With the default 0.250 second delay, the third attempt will win285async def test_basic_fallthrough(autojump_clock):286    sock, scenario = await run_scenario(287        80,288        [289            ("1.1.1.1", 1, "success"),290            ("2.2.2.2", 1, "success"),291            ("3.3.3.3", 0.2, "success"),292        ],293    )294    assert sock.ip == "3.3.3.3"295    # current time is default time + default time + connection time296    assert trio.current_time() == (0.250 + 0.250 + 0.2)297    assert scenario.connect_times == {298        "1.1.1.1": 0,299        "2.2.2.2": 0.250,300        "3.3.3.3": 0.500,301    }302async def test_early_success(autojump_clock):303    sock, scenario = await run_scenario(304        80,305        [306            ("1.1.1.1", 1, "success"),307            ("2.2.2.2", 0.1, "success"),308            ("3.3.3.3", 0.2, "success"),309        ],310    )311    assert sock.ip == "2.2.2.2"312    assert trio.current_time() == (0.250 + 0.1)313    assert scenario.connect_times == {314        "1.1.1.1": 0,315        "2.2.2.2": 0.250,316        # 3.3.3.3 was never even started317    }318# With a 0.450 second delay, the first attempt will win319async def test_custom_delay(autojump_clock):320    sock, scenario = await run_scenario(321        80,322        [323            ("1.1.1.1", 1, "success"),324            ("2.2.2.2", 1, "success"),325            ("3.3.3.3", 0.2, "success"),326        ],327        happy_eyeballs_delay=0.450,328    )329    assert sock.ip == "1.1.1.1"330    assert trio.current_time() == 1331    assert scenario.connect_times == {332        "1.1.1.1": 0,333        "2.2.2.2": 0.450,334        "3.3.3.3": 0.900,335    }336async def test_custom_errors_expedite(autojump_clock):337    sock, scenario = await run_scenario(338        80,339        [340            ("1.1.1.1", 0.1, "error"),341            ("2.2.2.2", 0.2, "error"),342            ("3.3.3.3", 10, "success"),343            # .25 is the default timeout344            ("4.4.4.4", 0.25, "success"),345        ],346    )347    assert sock.ip == "4.4.4.4"348    assert trio.current_time() == (0.1 + 0.2 + 0.25 + 0.25)349    assert scenario.connect_times == {350        "1.1.1.1": 0,351        "2.2.2.2": 0.1,352        "3.3.3.3": 0.1 + 0.2,353        "4.4.4.4": 0.1 + 0.2 + 0.25,354    }355async def test_all_fail(autojump_clock):356    exc, scenario = await run_scenario(357        80,358        [359            ("1.1.1.1", 0.1, "error"),360            ("2.2.2.2", 0.2, "error"),361            ("3.3.3.3", 10, "error"),362            ("4.4.4.4", 0.250, "error"),363        ],364        expect_error=OSError,365    )366    assert isinstance(exc, OSError)367    assert isinstance(exc.__cause__, trio.MultiError)368    assert len(exc.__cause__.exceptions) == 4369    assert trio.current_time() == (0.1 + 0.2 + 10)370    assert scenario.connect_times == {371        "1.1.1.1": 0,372        "2.2.2.2": 0.1,373        "3.3.3.3": 0.1 + 0.2,374        "4.4.4.4": 0.1 + 0.2 + 0.25,375    }376async def test_multi_success(autojump_clock):377    sock, scenario = await run_scenario(378        80,379        [380            ("1.1.1.1", 0.5, "error"),381            ("2.2.2.2", 10, "success"),382            ("3.3.3.3", 10 - 1, "success"),383            ("4.4.4.4", 10 - 2, "success"),384            ("5.5.5.5", 0.5, "error"),385        ],386        happy_eyeballs_delay=1,387    )388    assert not scenario.sockets["1.1.1.1"].succeeded389    assert (390        scenario.sockets["2.2.2.2"].succeeded391        or scenario.sockets["3.3.3.3"].succeeded392        or scenario.sockets["4.4.4.4"].succeeded393    )394    assert not scenario.sockets["5.5.5.5"].succeeded395    assert sock.ip in ["2.2.2.2", "3.3.3.3", "4.4.4.4"]396    assert trio.current_time() == (0.5 + 10)397    assert scenario.connect_times == {398        "1.1.1.1": 0,399        "2.2.2.2": 0.5,400        "3.3.3.3": 1.5,401        "4.4.4.4": 2.5,402        "5.5.5.5": 3.5,403    }404async def test_does_reorder(autojump_clock):405    sock, scenario = await run_scenario(406        80,407        [408            ("1.1.1.1", 10, "error"),409            # This would win if we tried it first...410            ("2.2.2.2", 1, "success"),411            # But in fact we try this first, because of section 5.4412            ("::3", 0.5, "success"),413        ],414        happy_eyeballs_delay=1,415    )416    assert sock.ip == "::3"417    assert trio.current_time() == 1 + 0.5418    assert scenario.connect_times == {419        "1.1.1.1": 0,420        "::3": 1,421    }422async def test_handles_no_ipv4(autojump_clock):423    sock, scenario = await run_scenario(424        80,425        # Here the ipv6 addresses fail at socket creation time, so the connect426        # configuration doesn't matter427        [428            ("::1", 10, "success"),429            ("2.2.2.2", 0, "success"),430            ("::3", 0.1, "success"),431            ("4.4.4.4", 0, "success"),432        ],433        happy_eyeballs_delay=1,434        ipv4_supported=False,435    )436    assert sock.ip == "::3"437    assert trio.current_time() == 1 + 0.1438    assert scenario.connect_times == {439        "::1": 0,440        "::3": 1.0,441    }442async def test_handles_no_ipv6(autojump_clock):443    sock, scenario = await run_scenario(444        80,445        # Here the ipv6 addresses fail at socket creation time, so the connect446        # configuration doesn't matter447        [448            ("::1", 0, "success"),449            ("2.2.2.2", 10, "success"),450            ("::3", 0, "success"),451            ("4.4.4.4", 0.1, "success"),452        ],453        happy_eyeballs_delay=1,454        ipv6_supported=False,455    )456    assert sock.ip == "4.4.4.4"457    assert trio.current_time() == 1 + 0.1458    assert scenario.connect_times == {459        "2.2.2.2": 0,460        "4.4.4.4": 1.0,461    }462async def test_no_hosts(autojump_clock):463    exc, scenario = await run_scenario(80, [], expect_error=OSError)464    assert "no results found" in str(exc)465async def test_cancel(autojump_clock):466    with trio.move_on_after(5) as cancel_scope:467        exc, scenario = await run_scenario(468            80,469            [470                ("1.1.1.1", 10, "success"),471                ("2.2.2.2", 10, "success"),472                ("3.3.3.3", 10, "success"),473                ("4.4.4.4", 10, "success"),474            ],475            expect_error=trio.MultiError,476        )477        # What comes out should be 1 or more Cancelled errors that all belong478        # to this cancel_scope; this is the easiest way to check that479        raise exc480    assert cancel_scope.cancelled_caught481    assert trio.current_time() == 5...transforms_test.py
Source:transforms_test.py  
...56Nested = typing.NamedTuple(57    'Nested', [('id', int), ('animal_speed', AnimalSpeed)])58coders.registry.register_coder(Nested, coders.RowCoder)59class TransformTest(unittest.TestCase):60  def run_scenario(self, input, func):61    expected = func(input)62    empty = input.iloc[0:0]63    input_placeholder = expressions.PlaceholderExpression(empty)64    input_deferred = frame_base.DeferredFrame.wrap(input_placeholder)65    actual_deferred = func(input_deferred)._expr.evaluate_at(66        expressions.Session({input_placeholder: input}))67    check_correct(expected, actual_deferred)68    with beam.Pipeline() as p:69      input_pcoll = p | beam.Create([input.iloc[::2], input.iloc[1::2]])70      input_df = convert.to_dataframe(input_pcoll, proxy=empty)71      output_df = func(input_df)72      output_proxy = output_df._expr.proxy()73      if isinstance(output_proxy, pd.core.generic.NDFrame):74        self.assertTrue(75            output_proxy.iloc[:0].equals(expected.iloc[:0]),76            (77                'Output proxy is incorrect:\n'78                f'Expected:\n{expected.iloc[:0]}\n\n'79                f'Actual:\n{output_proxy.iloc[:0]}'))80      else:81        self.assertEqual(type(output_proxy), type(expected))82      output_pcoll = convert.to_pcollection(output_df, yield_elements='pandas')83      assert_that(84          output_pcoll, lambda actual: check_correct(expected, concat(actual)))85  def test_identity(self):86    df = pd.DataFrame({87        'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],88        'Speed': [380., 370., 24., 26.]89    })90    self.run_scenario(df, lambda x: x)91  def test_groupby_sum_mean(self):92    df = pd.DataFrame({93        'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],94        'Speed': [380., 370., 24., 26.]95    })96    self.run_scenario(df, lambda df: df.groupby('Animal').sum())97    with expressions.allow_non_parallel_operations():98      self.run_scenario(df, lambda df: df.groupby('Animal').mean())99    self.run_scenario(100        df, lambda df: df.loc[df.Speed > 25].groupby('Animal').sum())101  def test_groupby_apply(self):102    df = pd.DataFrame({103        'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],104        'foo': [None if i % 11 == 0 else i for i in range(100)],105        'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],106        'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],107    })108    def median_sum_fn(x):109      return (x.foo + x.bar).median()110    describe = lambda df: df.describe()111    self.run_scenario(df, lambda df: df.groupby('group').foo.apply(describe))112    self.run_scenario(113        df, lambda df: df.groupby('group')[['foo', 'bar']].apply(describe))114    self.run_scenario(df, lambda df: df.groupby('group').apply(median_sum_fn))115    self.run_scenario(116        df,117        lambda df: df.set_index('group').foo.groupby(level=0).apply(describe))118    self.run_scenario(df, lambda df: df.groupby(level=0).apply(median_sum_fn))119    self.run_scenario(120        df, lambda df: df.groupby(lambda x: x % 3).apply(describe))121  def test_filter(self):122    df = pd.DataFrame({123        'Animal': ['Aardvark', 'Ant', 'Elephant', 'Zebra'],124        'Speed': [5, 2, 35, 40]125    })126    self.run_scenario(df, lambda df: df.filter(items=['Animal']))127    self.run_scenario(df, lambda df: df.filter(regex='Anim.*'))128    self.run_scenario(129        df, lambda df: df.set_index('Animal').filter(regex='F.*', axis='index'))130    with expressions.allow_non_parallel_operations():131      a = pd.DataFrame({'col': [1, 2, 3]})132      self.run_scenario(a, lambda a: a.agg(sum))133      self.run_scenario(a, lambda a: a.agg(['mean', 'min', 'max']))134  def test_scalar(self):135    with expressions.allow_non_parallel_operations():136      a = pd.Series([1, 2, 6])137      self.run_scenario(a, lambda a: a.agg(sum))138      self.run_scenario(a, lambda a: a / a.agg(sum))139      # Tests scalar being used as an input to a downstream stage.140      df = pd.DataFrame({'key': ['a', 'a', 'b'], 'val': [1, 2, 6]})141      self.run_scenario(142          df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))143  def test_getitem_projection(self):144    df = pd.DataFrame({145        'Animal': ['Aardvark', 'Ant', 'Elephant', 'Zebra'],146        'Speed': [5, 2, 35, 40],147        'Size': ['Small', 'Extra Small', 'Large', 'Medium']148    })149    self.run_scenario(df, lambda df: df[['Speed', 'Size']])150  def test_offset_elementwise(self):151    s = pd.Series(range(10)).astype(float)152    df = pd.DataFrame({'value': s, 'square': s * s, 'cube': s * s * s})153    # Only those values that are both squares and cubes will intersect.154    self.run_scenario(155        df,156        lambda df: df.set_index('square').value + df.set_index('cube').value)157  def test_batching_named_tuple_input(self):158    with beam.Pipeline() as p:159      result = (160          p | beam.Create([161              AnimalSpeed('Aardvark', 5),162              AnimalSpeed('Ant', 2),163              AnimalSpeed('Elephant', 35),164              AnimalSpeed('Zebra', 40)165          ]).with_output_types(AnimalSpeed)166          | transforms.DataframeTransform(lambda df: df.filter(regex='Anim.*')))167      assert_that(168          result,169          equal_to([('Aardvark', ), ('Ant', ), ('Elephant', ), ('Zebra', )]))170  def test_batching_beam_row_input(self):171    with beam.Pipeline() as p:172      result = (173          p174          | beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (u'Parrot', 24.),175                         (u'Parrot', 26.)])176          | beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1]))177          | transforms.DataframeTransform(178              lambda df: df.groupby('Animal').mean(), include_indexes=True))179      assert_that(result, equal_to([('Falcon', 375.), ('Parrot', 25.)]))180  def test_batching_beam_row_to_dataframe(self):181    with beam.Pipeline() as p:182      df = convert.to_dataframe(183          p184          | beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (185              u'Parrot', 24.), (u'Parrot', 26.)])186          | beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1])))187      result = convert.to_pcollection(188          df.groupby('Animal').mean(), include_indexes=True)189      assert_that(result, equal_to([('Falcon', 375.), ('Parrot', 25.)]))190  def test_batching_passthrough_nested_schema(self):191    with beam.Pipeline() as p:192      nested_schema_pc = (193          p | beam.Create([Nested(1, AnimalSpeed('Aardvark', 5))194                           ]).with_output_types(Nested))195      result = nested_schema_pc | transforms.DataframeTransform(  # pylint: disable=expression-not-assigned196          lambda df: df.filter(items=['animal_speed']))197      assert_that(result, equal_to([(('Aardvark', 5), )]))198  def test_batching_passthrough_nested_array(self):199    Array = typing.NamedTuple(200        'Array', [('id', int), ('business_numbers', typing.Sequence[int])])201    coders.registry.register_coder(Array, coders.RowCoder)202    with beam.Pipeline() as p:203      array_schema_pc = (p | beam.Create([Array(1, [7, 8, 9])]))204      result = array_schema_pc | transforms.DataframeTransform(  # pylint: disable=expression-not-assigned205            lambda df: df.filter(items=['business_numbers']))206      assert_that(result, equal_to([([7, 8, 9], )]))207  def test_unbatching_series(self):208    with beam.Pipeline() as p:209      result = (210          p211          | beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (u'Parrot', 24.),212                         (u'Parrot', 26.)])213          | beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1]))214          | transforms.DataframeTransform(lambda df: df.Animal))215      assert_that(result, equal_to(['Falcon', 'Falcon', 'Parrot', 'Parrot']))216  def test_input_output_polymorphism(self):217    one_series = pd.Series([1])218    two_series = pd.Series([2])219    three_series = pd.Series([3])220    proxy = one_series[:0]221    def equal_to_series(expected):222      def check(actual):223        actual = pd.concat(actual)224        if not expected.equals(actual):225          raise AssertionError(226              'Series not equal: \n%s\n%s\n' % (expected, actual))227      return check228    with beam.Pipeline() as p:229      one = p | 'One' >> beam.Create([one_series])230      two = p | 'Two' >> beam.Create([two_series])231      assert_that(232          one | 'PcollInPcollOut' >> transforms.DataframeTransform(233              lambda x: 3 * x, proxy=proxy, yield_elements='pandas'),234          equal_to_series(three_series),235          label='CheckPcollInPcollOut')236      assert_that(237          (one, two)238          | 'TupleIn' >> transforms.DataframeTransform(239              lambda x, y: (x + y), (proxy, proxy), yield_elements='pandas'),240          equal_to_series(three_series),241          label='CheckTupleIn')242      assert_that(243          dict(x=one, y=two)244          | 'DictIn' >> transforms.DataframeTransform(245              lambda x,246              y: (x + y),247              proxy=dict(x=proxy, y=proxy),248              yield_elements='pandas'),249          equal_to_series(three_series),250          label='CheckDictIn')251      double, triple = one | 'TupleOut' >> transforms.DataframeTransform(252              lambda x: (2*x, 3*x), proxy, yield_elements='pandas')253      assert_that(double, equal_to_series(two_series), 'CheckTupleOut0')254      assert_that(triple, equal_to_series(three_series), 'CheckTupleOut1')255      res = one | 'DictOut' >> transforms.DataframeTransform(256          lambda x: {'res': 3 * x}, proxy, yield_elements='pandas')257      assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut')258  def test_cat(self):259    # verify that cat works with a List[Series] since this is260    # missing from doctests261    df = pd.DataFrame({262        'one': ['A', 'B', 'C'],263        'two': ['BB', 'CC', 'A'],264        'three': ['CCC', 'AA', 'B'],265    })266    self.run_scenario(df, lambda df: df.two.str.cat([df.three], join='outer'))267    self.run_scenario(268        df, lambda df: df.one.str.cat([df.two, df.three], join='outer'))269  def test_repeat(self):270    # verify that repeat works with a Series since this is271    # missing from doctests272    df = pd.DataFrame({273        'strings': ['A', 'B', 'C', 'D', 'E'],274        'repeats': [3, 1, 4, 5, 2],275    })276    self.run_scenario(df, lambda df: df.strings.str.repeat(df.repeats))277  def test_rename(self):278    df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})279    self.run_scenario(280        df, lambda df: df.rename(columns={'B': 'C'}, index={281            0: 2, 2: 0282        }))283    with expressions.allow_non_parallel_operations():284      self.run_scenario(285          df,286          lambda df: df.rename(287              columns={'B': 'C'}, index={288                  0: 2, 2: 0289              }, errors='raise'))290class TransformPartsTest(unittest.TestCase):291  def test_rebatch(self):292    with beam.Pipeline() as p:293      sA = pd.Series(range(1000))294      sB = sA * sA295      pcA = p | 'CreatePCollA' >> beam.Create([('k0', sA[::3]),296                                               ('k1', sA[1::3]),297                                               ('k2', sA[2::3])])298      pcB = p | 'CreatePCollB' >> beam.Create([('k0', sB[::3]),...test_unitconv.py
Source:test_unitconv.py  
...71        self.assertDictEqual(72            unitconv.parse_unitname(''),73            {'multiplier': 1, 'unit_class': None, 'primary_unit': '',74             'base_unit': ''})75def run_scenario(user_asked_for, data_exists_as, allow_derivation=True,76                 allow_integration=False, allow_prefixes_in_denominator=False,77                 round_result=6):78    userunit = unitconv.parse_unitname(user_asked_for, fold_scale_prefix=False)79    prefixclass = unitconv.prefix_class_for(userunit['scale_multiplier'])80    use_unit = userunit['base_unit']81    compatibles = unitconv.determine_compatible_units(82            allow_derivation=allow_derivation,83            allow_integration=allow_integration,84            allow_prefixes_in_denominator=allow_prefixes_in_denominator,85            **userunit)86    try:87        scale, extra_op = compatibles[data_exists_as]88    except KeyError:89        return90    if round_result is not None:91        scale = round(scale, round_result)92    return (data_exists_as, use_unit, scale, extra_op, prefixclass)93class TestDetermineCompatible(unittest.TestCase):94    def test_compatible_to_simple_primary_type(self):95        all_time_units = [pair[0] for pair in unitconv.unit_classes_by_name['time']]96        u = unitconv.determine_compatible_units('s', 'time', allow_integration=False)97        compatunits = u.keys()98        for timeunit in all_time_units:99            self.assertIn(timeunit, compatunits)100        self.assertEqual(u['MM'], (60000000.0, None))101        self.assertEqual(u['h'], (3600.0, None))102        self.assertEqual([extra_op for (_multiplier, extra_op) in u.values()],103                         [None] * len(u))104    def test_allow_derivation(self):105        u = unitconv.determine_compatible_units('b', 'datasize', 1, 's', 'time', allow_integration=False)106        self.assertEqual(u['b'], (1.0, 'derive'))107        self.assertEqual(u['B'], (8.0, 'derive'))108        self.assertEqual(u['b/s'], (1.0, None))109        self.assertAlmostEqual(u['B/d'][0], 9.26e-05)110        self.assertIsNone(u['B/d'][1])111        self.assertNotIn('h', u)112    def test_allow_integration(self):113        u = unitconv.determine_compatible_units('Eggnog', None, 0.125, allow_integration=True)114        self.assertEqual(u['Eggnog'], (8.0, None))115        self.assertAlmostEqual(u['Eggnog/h'][0], 0.0022222)116        self.assertEqual(u['Eggnog/h'][1], 'integrate')117        self.assertNotIn('derive', [extra_op for (_multiplier, extra_op) in u.values()])118class TestUnitconv(unittest.TestCase):119    # in the comments explaining results, X(t) represents a data series in120    # graphite with the "data_exists_as" unit, and Y(t) represents the data121    # series we want to graph, in the "user_asked_for" unit. the results of122    # run_scenario should give the necessary steps to convert X(t) to Y(t).123    def test_straightforward_conversion(self):124        self.assertEqual(run_scenario(user_asked_for='B', data_exists_as='b'),125                         ('b', 'B', 0.125, None, 'si'))126        # 0.125 * X(t) b = Y(t) B127    def test_esoteric_conversion_with_derive(self):128        self.assertEqual(run_scenario(user_asked_for='MiB/d', data_exists_as='kb'),129                         ('kb', 'B/d', 10800000, 'derive', 'binary'))130        # d(X(t) kb)/dt kb/s * 86400 s/d * 1B/8b * 1000 B/kB = Y(t) B/d131        # 86400 * 1000 / 8 = 10800000132    def test_unrecognized_unit_derive(self):133        self.assertEqual(run_scenario(user_asked_for='Cheese/w', data_exists_as='Cheese'),134                         ('Cheese', 'Cheese/w', 604800.0, 'derive', 'si'))135        # d(604800.0 * X(t) Cheese)/dt = Y(t) Cheese/w136    def test_integration(self):137        self.assertEqual(run_scenario(user_asked_for='b', data_exists_as='MB/s',138                                      allow_integration=True),139                         ('MB/s', 'b', 8000000.0, 'integrate', 'si'))140        # Integral(8000000.0 * X(t) MB/s, dt) = Y(t) b141    def test_conversion_between_unrecognized_units(self):142        self.assertIsNone(run_scenario(user_asked_for='pony', data_exists_as='coal'))143        # can't convert144    def test_conversion_between_units_of_different_class(self):145        self.assertIsNone(run_scenario(user_asked_for='d', data_exists_as='Mb'))146        # we know what they are but we can't convert days to megabits147    def test_straightforward_conversion_with_compound_units(self):148        self.assertEqual(run_scenario(user_asked_for='kb/s', data_exists_as='TiB/w'),149                         ('TiB/w', 'b/s', 14543804.600212, None, 'si'))150        # X(t) TiB/w * (1024**4 B/TiB) * (8 b/B) * (1 w/604800 s) = Y(t) kb/s151        # 1024**4 * 8 / 604800 =~ 14543804.600212152    def test_straightforward_conversion_between_iec_data_rates(self):153        self.assertEqual(run_scenario(user_asked_for='KiB', data_exists_as='TiB/w',154                                      allow_integration=True),155                         ('TiB/w', 'B', 1817975.575026, 'integrate', 'binary'))156        # X(t) TiB/w * (1024**4 B/TiB) * (1 w/604800 s) = Z(t) B/s...test_combine.py
Source:test_combine.py  
...31    # block 6 Includes back-dated activity to last item of block 132    add_block(test_scope, entity_code, bs,'2020-01-05','2020-01-15','2020-02-02')33    # block 7 Includes back-dated activity to exclude block 1 entirely34    add_block(test_scope, entity_code, bs,'2012-12-31','2020-01-15','2020-02-03')35    def run_scenario(entity_scope, entity_code, to_date,asat_date,expected_open_tmv,expected_locked_tmv):36        blocks = bs.find_blocks(entity_scope, entity_code, '2020-01-03',to_date,asat_date)37        df = pd.DataFrame.from_records(38                [(o.date,o.tmv) 39                    for o in 40                    combine(blocks,locked,'2020-01-03',to_date,asat_date)],41                columns=['date','tmv']42            )43        total = df['tmv'].sum()44        expected = expected_locked_tmv if locked else expected_open_tmv45        if debug:46           print(nicer(df))47           print(f"Expected: {expected}, Actual: {total}")48        assert (total == pytest.approx(expected,0.001))49    # View on 01/10 for 01/0950    run_scenario(test_scope, entity_code, '2020-01-09','2020-01-10',6061.34,6061.34)51    # View on 01/10 for 01/1052    run_scenario(test_scope, entity_code, '2020-01-10','2020-01-10',7082.17,7082.17)53    # View on 01/1154    run_scenario(test_scope, entity_code, '2020-01-11','2020-01-11',7963.82,7963.82)55    # View on 01/15 for 01/1156    run_scenario(test_scope, entity_code, '2020-01-11','2020-01-15',8295.58,7963.82)57    # View on 01/15 for 01/1558    run_scenario(test_scope, entity_code, '2020-01-15','2020-01-15',12158.41,11826.65)59    # View on 02/0160    run_scenario(test_scope, entity_code, '2020-01-15','2020-02-01',2425.39,11826.65)61    # View on 02/0262    run_scenario(test_scope, entity_code, '2020-01-15','2020-02-02',1606.25,11826.65)63    # View on 02/03...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
