Best Python code snippet using pandera_python
serializers.py
Source:serializers.py  
1from django.contrib.auth.models import User2from rest_framework import serializers3from .models import Company, CompanyPolicy, LeaveCategory, Leave, Hierarchy, Holiday4from accounts.serializers import UserSerializer5from company.models import TaxTable6class LicenseCodeActivationSerializer(serializers.Serializer):7    code = serializers.CharField(max_length=255)8class CompanySerializer(serializers.ModelSerializer):9    admin = serializers.SerializerMethodField()10    class Meta:11        model = Company12        fields = '__all__'13        read_only_fields = ('created_by',)14    def get_admin(self, obj):15        if obj.admin:16            return UserSerializer(instance=obj.admin.user).data17        return {}18class ReferenceCompanySerializer(serializers.ModelSerializer):19    class Meta:20        model = Company21        fields = ('id', 'name',)22class ReferencePolicySerializer(serializers.ModelSerializer):23    class Meta:24        model = CompanyPolicy25        fields = ('id', 'name')26class RecursiveSerializer(serializers.Serializer):27    def to_representation(self, value):28        serializer = self.parent.__class__(value, context=self.context)29        return serializer.data30class HierarchySerializer(serializers.ModelSerializer):31    parent = RecursiveSerializer(read_only=True)32    items_count = serializers.ReadOnlyField(source="get_header_items")33    class Meta:34        model = Hierarchy35        fields = ('id', 'name', 'description', 'parent', 'is_active', 'is_header',36                  'created_by', 'company_policy', 'date_created', 'date_updated', 'items_count',)37        read_only_fields = ('created_by',)38    def to_representation(self, instance):39        self.fields['company_policy'] = ReferencePolicySerializer(read_only=True, many=True)40        return super(HierarchySerializer, self).to_representation(instance)41    def validate(self, attrs):42        company_policy = attrs['company_policy']43        name = attrs['name']44        base_check = Hierarchy.objects.filter(company_policy__in=company_policy, name=name)45        if self.instance:46            base_check = base_check.exclude(id=self.instance.id)47        if base_check.exists():48            raise serializers.ValidationError("Company structure: '{}' exists. Use a different name.".format(name))49        return attrs50class LeaveCategorySerializer(serializers.ModelSerializer):51    leave_count = serializers.ReadOnlyField(source="get_leave_count")52    class Meta:53        model = LeaveCategory54        fields = ('id',55                  'name',56                  'description',57                  'is_active',58                  'created_by', 'company_policy', 'date_created', 'date_updated', 'leave_count')59        read_only_fields = ('created_by',)60    def to_representation(self, instance):61        self.fields['company_policy'] = ReferencePolicySerializer(read_only=True, many=True)62        return super(LeaveCategorySerializer, self).to_representation(instance)63    def validate(self, attrs):64        company_policy = attrs['company_policy']65        name = attrs['name']66        base_check = LeaveCategory.objects.filter(company_policy__in=company_policy, name=name)67        if self.instance:68            base_check = base_check.exclude(id=self.instance.id)69        if base_check.exists():70            raise serializers.ValidationError("Leave Category: '{}' exists. Use a different name.".format(name))71        return attrs72class LeaveSerializer(serializers.ModelSerializer):73    class Meta:74        model = Leave75        fields = '__all__'76        read_only_fields = ('created_by', 'company',)77    def to_representation(self, instance):78        self.fields['category'] = LeaveCategorySerializer(read_only=True)79        return super(LeaveSerializer, self).to_representation(instance)80    def validate(self, attrs):81        category = attrs['category']82        name = attrs['name']83        base_check = Leave.objects.filter(category__company_policy__in=category.company_policy.values_list('id'),84                                          name=name,85                                          category=category)86        if attrs['is_sick_leave'] and base_check.filter(is_sick_leave=True).exists():87            raise serializers.ValidationError(88                "A sick leave already exists. You cannot have more than one sick leave in the same category")89        if self.instance:90            base_check = base_check.exclude(id=self.instance.id)91        if base_check.exists():92            raise serializers.ValidationError("Leave Type: '{}' exists for category '{}'. Use a different name.".format(name, category.name))93        return attrs94class PolicySerializer(serializers.ModelSerializer):95    class Meta:96        model = CompanyPolicy97        fields = '__all__'98        read_only_fields = ('created_by',)99    def to_representation(self, instance):100        self.fields['company'] = ReferenceCompanySerializer(read_only=True, many=True)101        return super(PolicySerializer, self).to_representation(instance)102    def validate(self, attrs):103        company = attrs['company']104        name = attrs['name']105        base_check = CompanyPolicy.objects.filter(company__in=company, name=name)106        if self.instance:107            base_check = base_check.exclude(id=self.instance.id)108        if base_check.exists():109            raise serializers.ValidationError("Company Policy: '{}' exists. Use a different name.".format(name))110        return attrs111class HolidaySerializer(serializers.ModelSerializer):112    class Meta:113        model = Holiday114        fields = '__all__'115        read_only_fields = ('created_by',)116    def to_representation(self, instance):117        self.fields['company_policy'] = ReferencePolicySerializer(read_only=True, many=True)118        return super(HolidaySerializer, self).to_representation(instance)119    def validate(self, attrs):120        company_policy = attrs['company_policy']121        name = attrs['name']122        base_check = Holiday.objects.filter(company_policy__in=company_policy, name=name)123        if self.instance:124            base_check = base_check.exclude(id=self.instance.id)125        if base_check.exists():126            raise serializers.ValidationError("Holiday: '{}' exists. Use a different name.".format(name))127        return attrs128class TaxTableSerializer(serializers.ModelSerializer):129    class Meta:130        model = TaxTable131        fields = '__all__'132        read_only_fields = ('created_by',)133    def to_representation(self, instance):134        self.fields['company_policy'] = ReferencePolicySerializer(read_only=True)...listprocessor.py
Source:listprocessor.py  
1import csv2import re3from arrpy import _checks as checks4from arrpy._basicrows import appender, extractor5    6def avg_col(arr, base_check = True, dim_check = False):7    """Return the arithmetic mean of the given columns (contained subarrays)."""8    checker = checks.ValidCheck(arr)9    if not checker.run_checks(base_check, dim_check):10        return11    width, height = len(arr), len(arr[0])12    avg_arr = []13    for y in range(height):14        temp_arr = []15        for x in range(width):16            temp_arr.append(arr[x][y])17        temp_avg = sum(temp_arr)/len(temp_arr)18        avg_arr.append(temp_avg)19    return avg_arr20def avg_row(arr, base_check = True, dim_check = False):21    """Return the arithmetic mean of the given rows (given index of contained subarrays)."""22    checker = checks.ValidCheck(arr)23    if not checker.run_checks(base_check, dim_check):24        return25    width, height = len(arr), len(arr[0])26    avg_arr = []27    for x in range(width):28        temp_arr = []29        for y in range(height):30            temp_arr.append(arr[x][y])31        temp_avg = sum(temp_arr)/len(temp_arr)32        avg_arr.append(temp_avg)33    return avg_arr34def append_row(arr, row, base_check = True, deep = True):35    """Return a new list of lists with the indicated row (list) appended."""36    checker = checks.ValidCheck(arr)37    if not checker.run_checks(base_check):38        return39    return appender(arr, row, deep)40def del_row(arr, row, base_check = True):41    """Return a new list of lists with the indicated row deleted."""42    checker = checks.ValidCheck(arr)43    if not checker.run_checks(base_check):44        return45    width, height = len(arr), len(arr[0])46    try:47        row = int(row)48    except ValueError:49        print(f"input row index {row} not coercible to type int. Exiting")50        return51    if not 0 <= row < height:52        print(f"row index {row} is invalid for an input array with {width} columns and {height} rows. Exiting")53        return54    arr_out = [[] for i in range(width)]55    for y in range(height):56        if y != row:57            arr_row = extractor(arr, y)58            appender(arr_out, arr_row, deep = False)59    return arr_out60def extract_row(arr, row, base_check = True):61    """Return the indicated row (a list of the values at the given index of all subarrays)."""62    checker = checks.ValidCheck(arr)63    if not checker.run_checks(base_check):64        return65    return extractor(arr, row)66def open_df(file, delim_type = "csv", subarray = "col", e = 'utf-8-sig'):67    """Open the tab or csv file and return it as a list of lists."""68    with open(file,'r', encoding = e) as f:69        file_text = f.read()70    delims = {"csv": "," , "tab": "\t"}71    rows = re.split("\n", file_text)72    #if the last char of the imported file is \n a false extra element will be added to the first column73    #strip this74    if rows[-1] == "":75        rows.pop()76    try:77        if subarray == "col":78            arr = [[] for char in rows[0] if char == delims[delim_type]] + [[]]79            for row in rows:80                cols = re.split(delims[delim_type], row)81                for i, col in enumerate(cols):82                    arr[i].append(col)83        else:84            arr = [re.split(delims[delim_type], row) for row in rows]85    except:86        print("Failed during file parsing. Check that delimitor type and encoding are suitable. Default file type is csv.")87        return88    return arr89def write_csv(arr, file, subarray = "col"):90    """Write the given list of lists as a csv file."""91    with open(file, 'w') as outfile:92        arr_writer = csv.writer(outfile)93        if subarray == "col":94            for i in range(len(arr[0])):95                row = extract_row(arr, i, base_check = False)96                arr_writer.writerow(row)97        else:98            for row in arr:...main.py
Source:main.py  
1import os2import sys3try:4    from base_check.common import utils5except ImportError:6    filename = os.path.abspath(__file__)7    _index = filename.rfind('/base_check/')8    dirname = filename[:_index] if _index > 0 \9        else os.path.dirname(filename)10    sys.path.insert(0, dirname)11    from base_check.common import utils12from base_check.common import log as logging13from base_check.common.cfg import CONF14# import the module to check15check_modules = [16    # 'base_check.checks.network.network_check.device_set_ip',17    # 'base_check.checks.network.network_check.network_base_check',18    # 'base_check.checks.network.network_check.bond_device_check',19    # 'base_check.checks.network.network_check.phy_device_set_off',20    # 'base_check.checks.network.network_check.connection_check',21    # 'base_check.checks.network.network_check.phy_device_set_up',22    # 'base_check.checks.network.network_check.phy_device_set_off',23    # 'base_check.checks.network.network_check.connection_check',24    # 'base_check.checks.network.network_check.phy_device_set_up',25    # 'base_check.checks.network.network_check.mac_check',26    # 'base_check.checks.network.network_check.speed_pressure_check',27    # 'base_check.checks.network.network_check.singlethread_speed_pressure_check',28    # 'base_check.checks.network.network_check.device_del_ip',29    # 'base_check.checks.system.system_check.system_port_check',30    # 'base_check.checks.system.system_check.numa_check',31     'base_check.checks.system.system_check.system_port_post_check',32    # 'base_check.checks.system.system_check.system_base_check',33    # ###########------esxi_base_network_status_test--------################34    # 'base_check.checks.network.esxi_network.esxi_network_base_check',35    # 'base_check.checks.network.esxi_network.esxi_vswitch_check'36    # 'base_check.checks.network.esxi_network.esxi_uplink_down',37    # 'base_check.checks.network.esxi_network.esxi_connection_check',38    # 'base_check.checks.network.esxi_network.esxi_uplink_up',39    # 'base_check.checks.network.esxi_network.esxi_uplink_down',40    # 'base_check.checks.network.esxi_network.esxi_connection_check',41    # 'base_check.checks.network.esxi_network.esxi_uplink_up',42]43def main():44    CONF()45    logging.setup(CONF)46    LOG = logging.getLogger(__name__)47    utils.import_module_list(check_modules)48    utils.run_all_checks(check_modules)49    if utils.check_all_passed():50        return 051    else:52        for info in utils.get_all_fails():53            LOG.error(info)54        return 155if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
