Best Python code snippet using hypothesis
loader.py
Source:loader.py  
1import re2import json3import logging4from django.utils import timezone5from dateutil.parser import parse as dt_parse6from names_translator.name_utils import parse_fullname7from abstract.loaders import FileLoader8from abstract.exc import ParsingError9logging.basicConfig(level=logging.WARNING)10logger = logging.getLogger("importer:vkks")11class VKKSLoader(FileLoader):12    filetype = "jsonlines"13    @property14    def model(self):15        from .models import VKKSModel16        return VKKSModel17    def process_e_declaration(self, record):18        # ÑодинниÑ
 звâÑзкÑв ÑÑÐ´Ð´Ñ Ð·Ð° 2013â2017 Ñоки19        year_from = record["values"].get("11")20        year_to = record["values"].get("12")21        title = record.get("date_and_type") or ""22        # TODO: Sane default?23        declaration_type = ""24        parsed_title = re.match(r"(.*)\s(\d{4})[\-â](\d{4})", title)25        if parsed_title:26            declaration_type = re.sub(r"\sза$", "", parsed_title.group(1))27            year_from = year_from or parsed_title.group(2)28            year_to = year_to or parsed_title.group(3)29        else:30            declaration_type = title31            if not year_from or not year_to:32                logger.warning("Cannot parse title '{}/{}'".format(year_from, year_to))33        l, f, p, _ = parse_fullname(record["fields"]["name"])34        submit_date = dt_parse(record["submitDate"]).astimezone(timezone.get_current_timezone())35        res = {36            "source": "electronic",37            "ID": record["ID"],38            "intro": {39                "declaration_year_from": year_from,40                "declaration_year_to": year_to,41                "declaration_type": declaration_type,42            },43            "general": {44                "post": {45                    "office": "{}, {}".format(46                        record["values"].get("114", {}).get("label", ""),47                        record["values"]["104"],48                    ).strip(" ,"),49                    "office_id": record["values"].get("114", {}).get("value"),50                    "post": record["values"]["105"],51                },52                "family": [],53                "family_conflicts": [],54                "last_name": l,55                "name": f,56                "patronymic": p,57                "family_comment": record["values"]["292"],58                "has_information": record["values"]["211"],59                "family_conflicts_comment": record["values"]["392"],60            },61            "declaration": {62                "date_day": str(submit_date.day),63                "date_month": str(submit_date.month),64                "date_year": str(submit_date.year),65                "date_time": str(submit_date.time()),66            },67        }68        for pos in range(0, int(record["values"]["220"]) + 1):69            # TODO: process cases where some records are reusing the name above.70            if record["values"]["220_{}-221".format(pos)]:71                fam = {}72                l, f, p, _ = parse_fullname(record["values"]["220_{}-221".format(pos)])73                fam["last_name"] = l74                fam["name"] = f75                fam["patronymic"] = p76                fam["relation"] = record["values"]["220_{}-222".format(pos)]77                fam["career"] = []78                # Trying to parse weird raw texts records for the career of the family79                # member80                # The general idea of the records in the four columns is that they 81                # are visually separated into rows, tho not always the separation looks82                # ideal even for the human eye. So, we are using plenty of heuristics 83                # below to find rows in every columns and dance around some of well known84                # corner cases (for example, no end date usually means that person still holds85                # an office)86                separator = "\n"87                if "\n" not in record["values"]["220_{}-224".format(pos)]:88                    if "__" in record["values"]["220_{}-224".format(pos)]:89                        separator = "__"90                    else:91                        separator = ","92                years_from = list(93                    filter(None, record["values"]["220_{}-225".format(pos)].split("\n"))94                )95                years_to = list(96                    filter(None, record["values"]["220_{}-226".format(pos)].split("\n"))97                )98                for x in range(4):99                    offices = list(100                        map(101                            lambda x: x.strip(" \n,_"),102                            filter(103                                None,104                                record["values"]["220_{}-223".format(pos)].split(105                                    separator * (4 - x)106                                ),107                            ),108                        )109                    )110                    positions = list(111                        map(112                            lambda x: x.strip(" \n,_"),113                            filter(114                                None,115                                record["values"]["220_{}-224".format(pos)].split(116                                    separator * (4 - x)117                                ),118                            ),119                        )120                    )121                    if len(positions) == len(years_from):122                        break123                if len(positions) != len(years_from):124                    raise ParsingError(125                        "Number of positions doesn't correspond to number of 'from' dates for {}".format(126                            res["ID"]127                        )128                    )129                if (len(years_from) - len(years_to)) > 1:130                    raise ParsingError(131                        "Number of 'from' dates doesn't correspond to number of 'to' dates for {}".format(132                            res["ID"]133                        )134                    )135                if len(positions) != len(offices) and len(offices) > 1:136                    raise ParsingError(137                        "Number of 'positions' doesn't correspond to number of 'offices' for {}".format(138                            res["ID"]139                        )140                    )141                if len(years_to) - len(years_from) > 1:142                    raise ParsingError(143                        "Number of 'to' dates has much more lines than number of 'from' dates for {}".format(144                            res["ID"]145                        )146                    )147                _parsing_quality = []148                if len(years_from) == len(years_to) == len(offices):149                    _parsing_quality.append("ideal")150                else:151                    # Here we analyzing corner cases and applying some fixes to know problems152                    # while carefully recording what we are doing and why for further visual153                    # examination154                    if len(years_from) - len(years_to) == 1:155                        _parsing_quality.append("no_end_date")156                        years_to.append("Ðо ÑепеÑÑÑнÑй ÑаÑ")157                    if len(positions) != len(offices) and len(offices) == 1:158                        _parsing_quality.append("one_office")159                        offices = [offices[0]] * len(positions)160                    if len(offices) == 0:161                        _parsing_quality.append("no_public_office")162                        offices = [""] * len(positions)163                    if len(years_to) - len(years_from) == 1:164                        _parsing_quality.append("last_date_has_two_lines")165                        years_to = years_to[:-2] + [years_to[-2] + " " + years_to[-1]]166                if not (len(years_from) == len(years_to) == len(offices)):167                    raise ParsingError("Something went very wrong when normalizing record {}".format(res["ID"]))168                for position, office, year_from, year_to in zip(positions, offices, years_from, years_to):169                    fam["_parsing_quality"] = _parsing_quality170                    fam["career"].append({171                        "position": position,172                        "workplace": office,173                        "from": year_from,174                        "to": year_to175                    })176                res["general"]["family"].append(fam)177        for pos in range(0, int(record["values"]["300"]) + 1):178            if record["values"]["300_{}-311".format(pos)]:179                fam = {}180                l, f, p, _ = parse_fullname(record["values"]["300_{}-311".format(pos)])181                fam["last_name"] = l182                fam["name"] = f183                fam["patronymic"] = p184                if record["values"]["300_{}-321".format(pos)]:185                    fam["coliving"] = "СпÑлÑно не пÑоживаÑмо"186                elif record["values"]["300_{}-322".format(pos)]:187                    fam["coliving"] = "СпÑлÑно пÑоживаÑмо"188                elif record["values"]["300_{}-323".format(pos)]:189                    fam["coliving"] = "ТимÑаÑово ÑпÑлÑно не пÑоживаÑмо"190                elif record["values"]["300_{}-324".format(pos)]:191                    fam["coliving"] = "ТимÑаÑово ÑпÑлÑно пÑоживаÑмо"192                if record["values"]["300_{}-331".format(pos)]:193                    fam["cohabiting"] = "СпÑлÑним побÑÑом не повâÑзанÑ"194                elif record["values"]["300_{}-332".format(pos)]:195                    fam["cohabiting"] = "ÐовâÑÐ·Ð°Ð½Ñ ÑпÑлÑним побÑÑом"196                elif record["values"]["300_{}-333".format(pos)]:197                    fam["cohabiting"] = "ТимÑаÑово не повâÑÐ·Ð°Ð½Ñ ÑпÑлÑним побÑÑом"198                elif record["values"]["300_{}-334".format(pos)]:199                    fam["cohabiting"] = "ТимÑаÑово повâÑÐ·Ð°Ð½Ñ ÑпÑлÑним побÑÑом"200                if record["values"]["300_{}-341".format(pos)]:201                    fam["mutual_liabilities"] = "ÐÑнÑÑÑÑ Ð²Ð·Ð°ÑÐ¼Ð½Ñ Ð¿Ñава Ñа/Ñи обовâÑзки"202                elif record["values"]["300_{}-342".format(pos)]:203                    fam["mutual_liabilities"] = "ÐÑдÑÑÑÐ½Ñ Ð²Ð·Ð°ÑÐ¼Ð½Ñ Ð¿Ñава Ñа обовâÑзки"204                res["general"]["family_conflicts"].append(fam)205        return res206    def process_paper_declaration(self, record):207        if record:208            rec = record[0]["answer"]209            rec["source"] = "paper"210            if "has_information" in rec["general"]:211                rec["general"]["has_information"] = rec["general"]["has_information"] == "1"212            else:213                rec["general"]["has_information"] = bool(rec["general"]["family"])214            rec["ID"] = record[0]["task"]["id"]215            rec["url"] = record[0]["task"]["data"]["file"]216        return rec217    def preprocess(self, record, options):218        if options["source"] == "electronic":219            return self.process_e_declaration(record)220        if options["source"] == "paper":221            return self.process_paper_declaration(record)222        return record223    def inject_params(self, parser):224        super().inject_params(parser)225        parser.add_argument(226            "--source",227            choices=("paper", "electronic"),228            required=True,229            help="Source of the data (affects the conversion/parsing)",230        )231    def get_dedup_fields(self):...arnetwork.py
Source:arnetwork.py  
1# Copyright (c) 2011 Bastian Venthur2#3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9#10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12#13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20import logging21"""22This module provides access to the data provided by the AR.Drone.23"""24import threading25import select26import socket27import multiprocessing28import libardrone29class ARDroneNetworkProcess(threading.Thread):30    """ARDrone Network Process.31    This process collects data from the video and navdata port, converts the32    data and sends it to the IPCThread.33    """34    def __init__(self, com_pipe, is_ar_drone_2, drone):35        threading.Thread.__init__(self)36        self._drone = drone37        self.com_pipe = com_pipe38        self.is_ar_drone_2 = is_ar_drone_239        40    def run(self):41        def _connect():42            logging.warn('Connexion vers AR Drone en cours ...')43            44            nav_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)45            nav_socket.setblocking(0)46            nav_socket.bind(('', libardrone.ARDRONE_NAVDATA_PORT))47            nav_socket.sendto("\x01\x00\x00\x00", ('192.168.1.1', libardrone.ARDRONE_NAVDATA_PORT))48            control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)49            control_socket.connect(('192.168.1.1', libardrone.ARDRONE_CONTROL_PORT))50            control_socket.setblocking(0)51            logging.warn('Connection etablie')52            return nav_socket, control_socket53        def _disconnect(nav_socket, control_socket):54            logging.warn('Deconnection AR Drone')55            nav_socket.close()56            control_socket.close()57        nav_socket, control_socket = _connect()58        stopping = False59        connection_lost = 160        reconnection_needed = False61        while not stopping:62            if reconnection_needed:63                _disconnect(nav_socket, control_socket)64                nav_socket, control_socket = _connect()65                reconnection_needed = False66            inputready, outputready, exceptready = select.select([nav_socket, self.com_pipe, control_socket], [], [], 1.)67            if len(inputready) == 0:68                connection_lost += 169                reconnection_needed = True70            for i in inputready:71                if i == nav_socket:72                    while 1:73                        try:74                            data = nav_socket.recv(500)75                        except IOError:76                            # we consumed every packet from the socket and77                            # continue with the last one78                            break79                    navdata, has_information = libardrone.decode_navdata(data)80                    if (has_information):81                        self._drone.set_navdata(navdata)82                elif i == self.com_pipe:83                    _ = self.com_pipe.recv()84                    stopping = True85                    break86                elif i == control_socket:87                    reconnection_needed = False88                    while not reconnection_needed:89                        try:90                            data = control_socket.recv(65536)91                            if len(data) == 0:92                                logging.warning('Received an empty packet on control socket')93                                reconnection_needed = True94                            else:95                                logging.warning("Control Socket says : %s", data)96                        except IOError:97                            break...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
