How to use test_collection method in autotest

Best Python code snippet using autotest_python

test_collection.py

Source:test_collection.py Github

copy

Full Screen

1import numpy as np2from numpy.testing import assert_array_equal3from pandas.testing import assert_frame_equal, assert_series_equal4from operator import and_5from functools import reduce6from mtg.collection import Collection7from pytest import raises8def test_collection_metadata(old_format_datafile):9 c = Collection(old_format_datafile, label="Islands", source="MtgManager")10 assert c.label == "Islands", f"Error in label: {c.label}"11 assert c.source == "MtgManager", f"Error in Source: {c.source}"12def test_old_format_layout(old_format_datafile):13 c = Collection(old_format_datafile)14 assert "Code" not in c.data.columns15 assert "ExpansionCode" in c.data.columns16 assert "ExpansionName" not in c.data.columns17 # Foil is Bool type18 assert c.data.Foil.dtype == bool19 # Condition is Categorical20 assert c.data.Condition.dtype.name == "category"21 # Language is Categorical22 assert c.data.Language.dtype.name == "category"23 # assert all ExpansionCode are lowercase24 lowercase_codes = c.data.ExpansionCode.apply(lambda c: c.lower()).values25 assert_array_equal(c.data.ExpansionCode.values, lowercase_codes)26def test_new_format_layout(new_format_datafile):27 c = Collection(new_format_datafile)28 assert "ExpansionCode" in c.data.columns29 assert "ExpansionName" in c.data.columns30 # Foil is Bool type31 assert c.data.Foil.dtype == bool32 # Condition is Categorical33 assert c.data.Condition.dtype.name == "category"34 # Language is Categorical35 assert c.data.Language.dtype.name == "category"36 # assert all ExpansionCode are lowercase37 lowercase_codes = c.data.ExpansionCode.apply(lambda c: c.lower()).values38 assert_array_equal(c.data.ExpansionCode.values, lowercase_codes)39def test_lazy_data_loading(test_collection):40 # No data property access yet41 assert test_collection._data is None42 # access data property - should not be None43 assert test_collection.data is not None44 assert_frame_equal(test_collection.data, test_collection._data)45 # they should be indeed the same object46 assert test_collection.data is test_collection._data47def test_len_collection_is_sum_of_quantities(test_collection):48 assert len(test_collection) > 049 assert len(test_collection) != len(test_collection.data)50 assert len(test_collection) == test_collection.data.Quantity.sum()51def test_collection_is_hashable(test_collection):52 assert hash(test_collection) is not None53 assert hash(test_collection) == hash(test_collection.source + test_collection.label)54def test_proxy_dataframe(test_collection):55 """This tests verify that dataframe access is proxied via Collection"""56 try:57 # try to call the head method of the dataframe underneath58 _ = test_collection.head()59 except AttributeError:60 assert False, "Attribute head() not found in Collection"61 else:62 assert True63 try:64 # try access single column65 _ = test_collection.Name66 except AttributeError:67 assert False, "Attribute head() not found in Collection"68 else:69 assert True70def test_non_existing_name_raises_attribute_error(test_collection):71 with raises(AttributeError):72 test_collection.headhead() # this does not exist73 test_collection.NonExistingColumn74def test_collection_is_subscriptable(test_collection):75 try:76 result = test_collection[test_collection.Name == "Island"]77 except TypeError:78 assert False, "Collection is Not Subscriptable"79 else:80 assert len(result) > 081 res_via_data = test_collection.data[test_collection.data.Name == "Island"]82 assert_frame_equal(result, res_via_data)83def test_collection_is_writable(test_collection):84 try:85 test_collection[test_collection.Name == "Island", ["Name", "Quantity"]] = (86 "Forest",87 1,88 )89 except RuntimeError:90 assert False, "Assignment not permitted"91 else:92 # check that all values have been actually changed93 assert len(test_collection[test_collection.Quantity > 1]) == 094 assert len(test_collection) == len(95 test_collection.data96 ) # every entry should count as for 197 # check no island exists anymore98 assert len(test_collection[test_collection.Name == "Island"]) == 099def test_add_new_column_to_collection(test_collection):100 n_cols_before_change = len(test_collection.columns)101 test_collection["NewColumn"] = test_collection.Quantity.apply(lambda v: v + 1)102 assert len(test_collection.columns) == n_cols_before_change + 1103 assert "NewColumn" in test_collection.columns104 qs = test_collection.Quantity.values.tolist()105 nqs = test_collection.NewColumn.values.tolist()106 assert reduce(and_, map(lambda p: p[0] > p[1] and p[0] == p[1] + 1, zip(nqs, qs)))107def test_diff_collections_nodiffs(new_format_datafile):108 c1 = Collection(new_format_datafile, label="Islands", source="Other NEW")109 c2 = Collection(new_format_datafile, label="Islands", source="NEW")110 assert len(c1 - c1) == 0111 assert len(c2 - c2) == 0112 assert len(c1 - c2) == 0113 assert len(c2 - c1) == 0114 assert "PurchaseDate" not in (c2 - c1).columns115def test_diff_collections_with_different_columns(116 old_format_datafile, new_format_datafile117):118 c1 = Collection(old_format_datafile, label="Islands")119 c2 = Collection(new_format_datafile, label="Islands")120 with raises(ValueError):121 c2 - c1122def test_diff_two_collections_different_label(old_format_datafile, new_format_datafile):123 c1 = Collection(old_format_datafile, label="Islands_old")124 c2 = Collection(new_format_datafile, label="Islands_new")125 with raises(ValueError):126 c2 - c1127def test_reorder_columns(new_format_datafile):128 c = Collection(new_format_datafile, label="Islands", source="Test New Format")129 cols = [130 "Quantity",131 "Name",132 "ExpansionCode",133 "ExpansionName",134 "PurchasePrice",135 "Foil",136 "Condition",137 "Language",138 "PurchaseDate",139 ]140 assert all(map(lambda col: col in c.columns, cols))141 assert_array_equal(c.columns, np.asarray(cols))142 cols_reversed = cols[::-1]143 c.resort_colunns(cols_reversed)144 assert_array_equal(c.columns, np.asarray(cols_reversed))145def test_reorder_non_existing_columns(new_format_datafile):146 c = Collection(new_format_datafile, label="Islands", source="Test New Format")147 cols = [148 "Quantity",149 "Name",150 "ExpansionCode",151 "ExpansionName",152 "PurchasePrice",153 "Foil",154 "Condition",155 "Language",156 "PurchaseDate",157 ]158 assert all(map(lambda col: col in c.columns, cols))159 assert_array_equal(c.columns, np.asarray(cols))160 with raises(ValueError):161 non_existing_cols = ["NonExistingCols"]...

Full Screen

Full Screen

tasks.py

Source:tasks.py Github

copy

Full Screen

1from operator import add2import os3from os import name4from threading import Lock5import time6from celery import Celery7import requests8import json9import celeryconfig10from pymongo import MongoClient11from random import randint12import logging13from logging.handlers import RotatingFileHandler14import stacktracer15import uuid16logger = logging.getLogger('app')17logger.setLevel(logging.DEBUG)18handler = RotatingFileHandler('log/app.log', maxBytes=1024000*1024000, backupCount=2)19logger.addHandler(handler)20env=os.environ21CELERY_BROKER_URL=env.get('CELERY_BROKER_URL','redis://localhost:6379'),22CELERY_RESULT_BACKEND=env.get('CELERY_RESULT_BACKEND','redis://localhost:6379')23celery= Celery('tasks',24 broker=CELERY_BROKER_URL,25 backend=CELERY_RESULT_BACKEND)26client = MongoClient(env.get('MONGO_DB_NAME', 'localhost'), 2701727, username=env.get('MONGO_INITDB_ROOT_USERNAME', 'admin')28, password=env.get('MONGO_INITDB_ROOT_PASSWORD', 'admin'))29db = client['tutorial']30test_collection = db['test-collection']31celery.config_from_object(celeryconfig)32stacktracer.trace_start("trace.html")33lock=Lock()34@celery.task(name='mytasks.add')35def add_task(id, y):36 if id is None:37 id = 138 temp_document = test_collection.find_one({"_id": id})39 random = randint(0, 1)40 if random == 1: 41 time.sleep(0.8)42 else:43 time.sleep(0.5)44 45 # if id == 1:46 # temp_document = test_collection.find_one({"_id": id})47 # if temp_document['count'] < 30:48 with lock:49 temp_document = test_collection.find_one({"_id": id})50 logger.info(temp_document)51 time.sleep(0.05)52 if temp_document['count'] < 30:53 currentCount = temp_document['count']54 temp_document['count'] = currentCount + 155 test_collection.update_one({"_id": id}, {"$set": {"count": temp_document['count']}})56 # else:57 # temp_document = test_collection.find_one({"_id": id})58 # logger.info(temp_document)59 # time.sleep(0.5)60 # if temp_document['count'] < 30:61 # currentCount = temp_document['count']62 # temp_document['count'] = currentCount + 163 # test_collection.update_one({"_id": id}, {"$set": {"count": temp_document['count']}})64 # with client.start_session() as session:65 # session.start_transaction()66 # try:67 # document = test_collection.find_one({"_id": id})68 # currentCount = document['count']69 # document['count'] = currentCount + 170 # test_collection.update_one({"_id": id}, {"$set": {"count": document['count']}})71 # # session.commit_transaction()72 # # session.endSession()73 # except Exception as ex:74 # logger.info(ex)75 return id76@celery.task(name='mytasks.periodic')77def periodic():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful