Best Python code snippet using avocado_python
test_offload.py
Source:test_offload.py  
...67        return68    print("FAIL: " + msg)69    log("FAIL: " + msg, "", level=1)70    os.sys.exit(1)71def start_test(msg):72    log(msg, "", level=1)73    log_level_inc()74    print(msg)75def cmd(cmd, shell=True, include_stderr=False, background=False, fail=True):76    """77    Run a command in subprocess and return tuple of (retval, stdout);78    optionally return stderr as well as third value.79    """80    proc = subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE,81                            stderr=subprocess.PIPE)82    if background:83        msg = "%s START: %s" % (log_get_sec(1),84                                datetime.now().strftime("%H:%M:%S.%f"))85        log("BKG " + proc.args, msg)86        return proc87    return cmd_result(proc, include_stderr=include_stderr, fail=fail)88def cmd_result(proc, include_stderr=False, fail=False):89    stdout, stderr = proc.communicate()90    stdout = stdout.decode("utf-8")91    stderr = stderr.decode("utf-8")92    proc.stdout.close()93    proc.stderr.close()94    stderr = "\n" + stderr95    if stderr[-1] == "\n":96        stderr = stderr[:-1]97    sec = log_get_sec(1)98    log("CMD " + proc.args,99        "RETCODE: %d\n%s STDOUT:\n%s%s STDERR:%s\n%s END: %s" %100        (proc.returncode, sec, stdout, sec, stderr,101         sec, datetime.now().strftime("%H:%M:%S.%f")))102    if proc.returncode != 0 and fail:103        if len(stderr) > 0 and stderr[-1] == "\n":104            stderr = stderr[:-1]105        raise Exception("Command failed: %s\n%s" % (proc.args, stderr))106    if include_stderr:107        return proc.returncode, stdout, stderr108    else:109        return proc.returncode, stdout110def rm(f):111    cmd("rm -f %s" % (f))112    if f in files:113        files.remove(f)114def tool(name, args, flags, JSON=True, ns="", fail=True, include_stderr=False):115    params = ""116    if JSON:117        params += "%s " % (flags["json"])118    if ns != "":119        ns = "ip netns exec %s " % (ns)120    if include_stderr:121        ret, stdout, stderr = cmd(ns + name + " " + params + args,122                                  fail=fail, include_stderr=True)123    else:124        ret, stdout = cmd(ns + name + " " + params + args,125                          fail=fail, include_stderr=False)126    if JSON and len(stdout.strip()) != 0:127        out = json.loads(stdout)128    else:129        out = stdout130    if include_stderr:131        return ret, out, stderr132    else:133        return ret, out134def bpftool(args, JSON=True, ns="", fail=True, include_stderr=False):135    return tool("bpftool", args, {"json":"-p"}, JSON=JSON, ns=ns,136                fail=fail, include_stderr=include_stderr)137def bpftool_prog_list(expected=None, ns=""):138    _, progs = bpftool("prog show", JSON=True, ns=ns, fail=True)139    # Remove the base progs140    for p in base_progs:141        if p in progs:142            progs.remove(p)143    if expected is not None:144        if len(progs) != expected:145            fail(True, "%d BPF programs loaded, expected %d" %146                 (len(progs), expected))147    return progs148def bpftool_map_list(expected=None, ns=""):149    _, maps = bpftool("map show", JSON=True, ns=ns, fail=True)150    # Remove the base maps151    for m in base_maps:152        if m in maps:153            maps.remove(m)154    if expected is not None:155        if len(maps) != expected:156            fail(True, "%d BPF maps loaded, expected %d" %157                 (len(maps), expected))158    return maps159def bpftool_prog_list_wait(expected=0, n_retry=20):160    for i in range(n_retry):161        nprogs = len(bpftool_prog_list())162        if nprogs == expected:163            return164        time.sleep(0.05)165    raise Exception("Time out waiting for program counts to stabilize want %d, have %d" % (expected, nprogs))166def bpftool_map_list_wait(expected=0, n_retry=20):167    for i in range(n_retry):168        nmaps = len(bpftool_map_list())169        if nmaps == expected:170            return171        time.sleep(0.05)172    raise Exception("Time out waiting for map counts to stabilize want %d, have %d" % (expected, nmaps))173def bpftool_prog_load(sample, file_name, maps=[], prog_type="xdp", dev=None,174                      fail=True, include_stderr=False):175    args = "prog load %s %s" % (os.path.join(bpf_test_dir, sample), file_name)176    if prog_type is not None:177        args += " type " + prog_type178    if dev is not None:179        args += " dev " + dev180    if len(maps):181        args += " map " + " map ".join(maps)182    res = bpftool(args, fail=fail, include_stderr=include_stderr)183    if res[0] == 0:184        files.append(file_name)185    return res186def ip(args, force=False, JSON=True, ns="", fail=True, include_stderr=False):187    if force:188        args = "-force " + args189    return tool("ip", args, {"json":"-j"}, JSON=JSON, ns=ns,190                fail=fail, include_stderr=include_stderr)191def tc(args, JSON=True, ns="", fail=True, include_stderr=False):192    return tool("tc", args, {"json":"-p"}, JSON=JSON, ns=ns,193                fail=fail, include_stderr=include_stderr)194def ethtool(dev, opt, args, fail=True):195    return cmd("ethtool %s %s %s" % (opt, dev["ifname"], args), fail=fail)196def bpf_obj(name, sec=".text", path=bpf_test_dir,):197    return "obj %s sec %s" % (os.path.join(path, name), sec)198def bpf_pinned(name):199    return "pinned %s" % (name)200def bpf_bytecode(bytecode):201    return "bytecode \"%s\"" % (bytecode)202def mknetns(n_retry=10):203    for i in range(n_retry):204        name = ''.join([random.choice(string.ascii_letters) for i in range(8)])205        ret, _ = ip("netns add %s" % (name), fail=False)206        if ret == 0:207            netns.append(name)208            return name209    return None210def int2str(fmt, val):211    ret = []212    for b in struct.pack(fmt, val):213        ret.append(int(b))214    return " ".join(map(lambda x: str(x), ret))215def str2int(strtab):216    inttab = []217    for i in strtab:218        inttab.append(int(i, 16))219    ba = bytearray(inttab)220    if len(strtab) == 4:221        fmt = "I"222    elif len(strtab) == 8:223        fmt = "Q"224    else:225        raise Exception("String array of len %d can't be unpacked to an int" %226                        (len(strtab)))227    return struct.unpack(fmt, ba)[0]228class DebugfsDir:229    """230    Class for accessing DebugFS directories as a dictionary.231    """232    def __init__(self, path):233        self.path = path234        self._dict = self._debugfs_dir_read(path)235    def __len__(self):236        return len(self._dict.keys())237    def __getitem__(self, key):238        if type(key) is int:239            key = list(self._dict.keys())[key]240        return self._dict[key]241    def __setitem__(self, key, value):242        log("DebugFS set %s = %s" % (key, value), "")243        log_level_inc()244        cmd("echo '%s' > %s/%s" % (value, self.path, key))245        log_level_dec()246        _, out = cmd('cat %s/%s' % (self.path, key))247        self._dict[key] = out.strip()248    def _debugfs_dir_read(self, path):249        dfs = {}250        log("DebugFS state for %s" % (path), "")251        log_level_inc(add=2)252        _, out = cmd('ls ' + path)253        for f in out.split():254            p = os.path.join(path, f)255            if os.path.isfile(p):256                _, out = cmd('cat %s/%s' % (path, f))257                dfs[f] = out.strip()258            elif os.path.isdir(p):259                dfs[f] = DebugfsDir(p)260            else:261                raise Exception("%s is neither file nor directory" % (p))262        log_level_dec()263        log("DebugFS state", dfs)264        log_level_dec()265        return dfs266class NetdevSim:267    """268    Class for netdevsim netdevice and its attributes.269    """270    def __init__(self, link=None):271        self.link = link272        self.dev = self._netdevsim_create()273        devs.append(self)274        self.ns = ""275        self.dfs_dir = '/sys/kernel/debug/netdevsim/%s' % (self.dev['ifname'])276        self.sdev_dir = self.dfs_dir + '/sdev/'277        self.dfs_refresh()278    def __getitem__(self, key):279        return self.dev[key]280    def _netdevsim_create(self):281        link = "" if self.link is None else "link " + self.link.dev['ifname']282        _, old  = ip("link show")283        ip("link add sim%d {link} type netdevsim".format(link=link))284        _, new  = ip("link show")285        for dev in new:286            f = filter(lambda x: x["ifname"] == dev["ifname"], old)287            if len(list(f)) == 0:288                return dev289        raise Exception("failed to create netdevsim device")290    def remove(self):291        devs.remove(self)292        ip("link del dev %s" % (self.dev["ifname"]), ns=self.ns)293    def dfs_refresh(self):294        self.dfs = DebugfsDir(self.dfs_dir)295        return self.dfs296    def dfs_read(self, f):297        path = os.path.join(self.dfs_dir, f)298        _, data = cmd('cat %s' % (path))299        return data.strip()300    def dfs_num_bound_progs(self):301        path = os.path.join(self.sdev_dir, "bpf_bound_progs")302        _, progs = cmd('ls %s' % (path))303        return len(progs.split())304    def dfs_get_bound_progs(self, expected):305        progs = DebugfsDir(os.path.join(self.sdev_dir, "bpf_bound_progs"))306        if expected is not None:307            if len(progs) != expected:308                fail(True, "%d BPF programs bound, expected %d" %309                     (len(progs), expected))310        return progs311    def wait_for_flush(self, bound=0, total=0, n_retry=20):312        for i in range(n_retry):313            nbound = self.dfs_num_bound_progs()314            nprogs = len(bpftool_prog_list())315            if nbound == bound and nprogs == total:316                return317            time.sleep(0.05)318        raise Exception("Time out waiting for program counts to stabilize want %d/%d, have %d bound, %d loaded" % (bound, total, nbound, nprogs))319    def set_ns(self, ns):320        name = "1" if ns == "" else ns321        ip("link set dev %s netns %s" % (self.dev["ifname"], name), ns=self.ns)322        self.ns = ns323    def set_mtu(self, mtu, fail=True):324        return ip("link set dev %s mtu %d" % (self.dev["ifname"], mtu),325                  fail=fail)326    def set_xdp(self, bpf, mode, force=False, JSON=True, verbose=False,327                fail=True, include_stderr=False):328        if verbose:329            bpf += " verbose"330        return ip("link set dev %s xdp%s %s" % (self.dev["ifname"], mode, bpf),331                  force=force, JSON=JSON,332                  fail=fail, include_stderr=include_stderr)333    def unset_xdp(self, mode, force=False, JSON=True,334                  fail=True, include_stderr=False):335        return ip("link set dev %s xdp%s off" % (self.dev["ifname"], mode),336                  force=force, JSON=JSON,337                  fail=fail, include_stderr=include_stderr)338    def ip_link_show(self, xdp):339        _, link = ip("link show dev %s" % (self['ifname']))340        if len(link) > 1:341            raise Exception("Multiple objects on ip link show")342        if len(link) < 1:343            return {}344        fail(xdp != "xdp" in link,345             "XDP program not reporting in iplink (reported %s, expected %s)" %346             ("xdp" in link, xdp))347        return link[0]348    def tc_add_ingress(self):349        tc("qdisc add dev %s ingress" % (self['ifname']))350    def tc_del_ingress(self):351        tc("qdisc del dev %s ingress" % (self['ifname']))352    def tc_flush_filters(self, bound=0, total=0):353        self.tc_del_ingress()354        self.tc_add_ingress()355        self.wait_for_flush(bound=bound, total=total)356    def tc_show_ingress(self, expected=None):357        # No JSON support, oh well...358        flags = ["skip_sw", "skip_hw", "in_hw"]359        named = ["protocol", "pref", "chain", "handle", "id", "tag"]360        args = "-s filter show dev %s ingress" % (self['ifname'])361        _, out = tc(args, JSON=False)362        filters = []363        lines = out.split('\n')364        for line in lines:365            words = line.split()366            if "handle" not in words:367                continue368            fltr = {}369            for flag in flags:370                fltr[flag] = flag in words371            for name in named:372                try:373                    idx = words.index(name)374                    fltr[name] = words[idx + 1]375                except ValueError:376                    pass377            filters.append(fltr)378        if expected is not None:379            fail(len(filters) != expected,380                 "%d ingress filters loaded, expected %d" %381                 (len(filters), expected))382        return filters383    def cls_filter_op(self, op, qdisc="ingress", prio=None, handle=None,384                      chain=None, cls="", params="",385                      fail=True, include_stderr=False):386        spec = ""387        if prio is not None:388            spec += " prio %d" % (prio)389        if handle:390            spec += " handle %s" % (handle)391        if chain is not None:392            spec += " chain %d" % (chain)393        return tc("filter {op} dev {dev} {qdisc} {spec} {cls} {params}"\394                  .format(op=op, dev=self['ifname'], qdisc=qdisc, spec=spec,395                          cls=cls, params=params),396                  fail=fail, include_stderr=include_stderr)397    def cls_bpf_add_filter(self, bpf, op="add", prio=None, handle=None,398                           chain=None, da=False, verbose=False,399                           skip_sw=False, skip_hw=False,400                           fail=True, include_stderr=False):401        cls = "bpf " + bpf402        params = ""403        if da:404            params += " da"405        if verbose:406            params += " verbose"407        if skip_sw:408            params += " skip_sw"409        if skip_hw:410            params += " skip_hw"411        return self.cls_filter_op(op=op, prio=prio, handle=handle, cls=cls,412                                  chain=chain, params=params,413                                  fail=fail, include_stderr=include_stderr)414    def set_ethtool_tc_offloads(self, enable, fail=True):415        args = "hw-tc-offload %s" % ("on" if enable else "off")416        return ethtool(self, "-K", args, fail=fail)417################################################################################418def clean_up():419    global files, netns, devs420    for dev in devs:421        dev.remove()422    for f in files:423        cmd("rm -f %s" % (f))424    for ns in netns:425        cmd("ip netns delete %s" % (ns))426    files = []427    netns = []428def pin_prog(file_name, idx=0):429    progs = bpftool_prog_list(expected=(idx + 1))430    prog = progs[idx]431    bpftool("prog pin id %d %s" % (prog["id"], file_name))432    files.append(file_name)433    return file_name, bpf_pinned(file_name)434def pin_map(file_name, idx=0, expected=1):435    maps = bpftool_map_list(expected=expected)436    m = maps[idx]437    bpftool("map pin id %d %s" % (m["id"], file_name))438    files.append(file_name)439    return file_name, bpf_pinned(file_name)440def check_dev_info_removed(prog_file=None, map_file=None):441    bpftool_prog_list(expected=0)442    ret, err = bpftool("prog show pin %s" % (prog_file), fail=False)443    fail(ret == 0, "Showing prog with removed device did not fail")444    fail(err["error"].find("No such device") == -1,445         "Showing prog with removed device expected ENODEV, error is %s" %446         (err["error"]))447    bpftool_map_list(expected=0)448    ret, err = bpftool("map show pin %s" % (map_file), fail=False)449    fail(ret == 0, "Showing map with removed device did not fail")450    fail(err["error"].find("No such device") == -1,451         "Showing map with removed device expected ENODEV, error is %s" %452         (err["error"]))453def check_dev_info(other_ns, ns, prog_file=None, map_file=None, removed=False):454    progs = bpftool_prog_list(expected=1, ns=ns)455    prog = progs[0]456    fail("dev" not in prog.keys(), "Device parameters not reported")457    dev = prog["dev"]458    fail("ifindex" not in dev.keys(), "Device parameters not reported")459    fail("ns_dev" not in dev.keys(), "Device parameters not reported")460    fail("ns_inode" not in dev.keys(), "Device parameters not reported")461    if not other_ns:462        fail("ifname" not in dev.keys(), "Ifname not reported")463        fail(dev["ifname"] != sim["ifname"],464             "Ifname incorrect %s vs %s" % (dev["ifname"], sim["ifname"]))465    else:466        fail("ifname" in dev.keys(), "Ifname is reported for other ns")467    maps = bpftool_map_list(expected=2, ns=ns)468    for m in maps:469        fail("dev" not in m.keys(), "Device parameters not reported")470        fail(dev != m["dev"], "Map's device different than program's")471def check_extack(output, reference, args):472    if skip_extack:473        return474    lines = output.split("\n")475    comp = len(lines) >= 2 and lines[1] == 'Error: ' + reference476    fail(not comp, "Missing or incorrect netlink extack message")477def check_extack_nsim(output, reference, args):478    check_extack(output, "netdevsim: " + reference, args)479def check_no_extack(res, needle):480    fail((res[1] + res[2]).count(needle) or (res[1] + res[2]).count("Warning:"),481         "Found '%s' in command output, leaky extack?" % (needle))482def check_verifier_log(output, reference):483    lines = output.split("\n")484    for l in reversed(lines):485        if l == reference:486            return487    fail(True, "Missing or incorrect message from netdevsim in verifier log")488def test_spurios_extack(sim, obj, skip_hw, needle):489    res = sim.cls_bpf_add_filter(obj, prio=1, handle=1, skip_hw=skip_hw,490                                 include_stderr=True)491    check_no_extack(res, needle)492    res = sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1,493                                 skip_hw=skip_hw, include_stderr=True)494    check_no_extack(res, needle)495    res = sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf",496                            include_stderr=True)497    check_no_extack(res, needle)498# Parse command line499parser = argparse.ArgumentParser()500parser.add_argument("--log", help="output verbose log to given file")501args = parser.parse_args()502if args.log:503    logfile = open(args.log, 'w+')504    logfile.write("# -*-Org-*-")505log("Prepare...", "", level=1)506log_level_inc()507# Check permissions508skip(os.getuid() != 0, "test must be run as root")509# Check tools510ret, progs = bpftool("prog", fail=False)511skip(ret != 0, "bpftool not installed")512base_progs = progs513_, base_maps = bpftool("map")514# Check netdevsim515ret, out = cmd("modprobe netdevsim", fail=False)516skip(ret != 0, "netdevsim module could not be loaded")517# Check debugfs518_, out = cmd("mount")519if out.find("/sys/kernel/debug type debugfs") == -1:520    cmd("mount -t debugfs none /sys/kernel/debug")521# Check samples are compiled522samples = ["sample_ret0.o", "sample_map_ret0.o"]523for s in samples:524    ret, out = cmd("ls %s/%s" % (bpf_test_dir, s), fail=False)525    skip(ret != 0, "sample %s/%s not found, please compile it" %526         (bpf_test_dir, s))527# Check if iproute2 is built with libmnl (needed by extack support)528_, _, err = cmd("tc qdisc delete dev lo handle 0",529                fail=False, include_stderr=True)530if err.find("Error: Failed to find qdisc with specified handle.") == -1:531    print("Warning: no extack message in iproute2 output, libmnl missing?")532    log("Warning: no extack message in iproute2 output, libmnl missing?", "")533    skip_extack = True534# Check if net namespaces seem to work535ns = mknetns()536skip(ns is None, "Could not create a net namespace")537cmd("ip netns delete %s" % (ns))538netns = []539try:540    obj = bpf_obj("sample_ret0.o")541    bytecode = bpf_bytecode("1,6 0 0 4294967295,")542    start_test("Test destruction of generic XDP...")543    sim = NetdevSim()544    sim.set_xdp(obj, "generic")545    sim.remove()546    bpftool_prog_list_wait(expected=0)547    sim = NetdevSim()548    sim.tc_add_ingress()549    start_test("Test TC non-offloaded...")550    ret, _ = sim.cls_bpf_add_filter(obj, skip_hw=True, fail=False)551    fail(ret != 0, "Software TC filter did not load")552    start_test("Test TC non-offloaded isn't getting bound...")553    ret, _ = sim.cls_bpf_add_filter(obj, fail=False)554    fail(ret != 0, "Software TC filter did not load")555    sim.dfs_get_bound_progs(expected=0)556    sim.tc_flush_filters()557    start_test("Test TC offloads are off by default...")558    ret, _, err = sim.cls_bpf_add_filter(obj, skip_sw=True,559                                         fail=False, include_stderr=True)560    fail(ret == 0, "TC filter loaded without enabling TC offloads")561    check_extack(err, "TC offload is disabled on net device.", args)562    sim.wait_for_flush()563    sim.set_ethtool_tc_offloads(True)564    sim.dfs["bpf_tc_non_bound_accept"] = "Y"565    start_test("Test TC offload by default...")566    ret, _ = sim.cls_bpf_add_filter(obj, fail=False)567    fail(ret != 0, "Software TC filter did not load")568    sim.dfs_get_bound_progs(expected=0)569    ingress = sim.tc_show_ingress(expected=1)570    fltr = ingress[0]571    fail(not fltr["in_hw"], "Filter not offloaded by default")572    sim.tc_flush_filters()573    start_test("Test TC cBPF bytcode tries offload by default...")574    ret, _ = sim.cls_bpf_add_filter(bytecode, fail=False)575    fail(ret != 0, "Software TC filter did not load")576    sim.dfs_get_bound_progs(expected=0)577    ingress = sim.tc_show_ingress(expected=1)578    fltr = ingress[0]579    fail(not fltr["in_hw"], "Bytecode not offloaded by default")580    sim.tc_flush_filters()581    sim.dfs["bpf_tc_non_bound_accept"] = "N"582    start_test("Test TC cBPF unbound bytecode doesn't offload...")583    ret, _, err = sim.cls_bpf_add_filter(bytecode, skip_sw=True,584                                         fail=False, include_stderr=True)585    fail(ret == 0, "TC bytecode loaded for offload")586    check_extack_nsim(err, "netdevsim configured to reject unbound programs.",587                      args)588    sim.wait_for_flush()589    start_test("Test non-0 chain offload...")590    ret, _, err = sim.cls_bpf_add_filter(obj, chain=1, prio=1, handle=1,591                                         skip_sw=True,592                                         fail=False, include_stderr=True)593    fail(ret == 0, "Offloaded a filter to chain other than 0")594    check_extack(err, "Driver supports only offload of chain 0.", args)595    sim.tc_flush_filters()596    start_test("Test TC replace...")597    sim.cls_bpf_add_filter(obj, prio=1, handle=1)598    sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1)599    sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")600    sim.cls_bpf_add_filter(obj, prio=1, handle=1, skip_sw=True)601    sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1, skip_sw=True)602    sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")603    sim.cls_bpf_add_filter(obj, prio=1, handle=1, skip_hw=True)604    sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1, skip_hw=True)605    sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")606    start_test("Test TC replace bad flags...")607    for i in range(3):608        for j in range(3):609            ret, _ = sim.cls_bpf_add_filter(obj, op="replace", prio=1, handle=1,610                                            skip_sw=(j == 1), skip_hw=(j == 2),611                                            fail=False)612            fail(bool(ret) != bool(j),613                 "Software TC incorrect load in replace test, iteration %d" %614                 (j))615        sim.cls_filter_op(op="delete", prio=1, handle=1, cls="bpf")616    start_test("Test spurious extack from the driver...")617    test_spurios_extack(sim, obj, False, "netdevsim")618    test_spurios_extack(sim, obj, True, "netdevsim")619    sim.set_ethtool_tc_offloads(False)620    test_spurios_extack(sim, obj, False, "TC offload is disabled")621    test_spurios_extack(sim, obj, True, "TC offload is disabled")622    sim.set_ethtool_tc_offloads(True)623    sim.tc_flush_filters()624    start_test("Test TC offloads work...")625    ret, _, err = sim.cls_bpf_add_filter(obj, verbose=True, skip_sw=True,626                                         fail=False, include_stderr=True)627    fail(ret != 0, "TC filter did not load with TC offloads enabled")628    check_verifier_log(err, "[netdevsim] Hello from netdevsim!")629    start_test("Test TC offload basics...")630    dfs = sim.dfs_get_bound_progs(expected=1)631    progs = bpftool_prog_list(expected=1)632    ingress = sim.tc_show_ingress(expected=1)633    dprog = dfs[0]634    prog = progs[0]635    fltr = ingress[0]636    fail(fltr["skip_hw"], "TC does reports 'skip_hw' on offloaded filter")637    fail(not fltr["in_hw"], "TC does not report 'in_hw' for offloaded filter")638    fail(not fltr["skip_sw"], "TC does not report 'skip_sw' back")639    start_test("Test TC offload is device-bound...")640    fail(str(prog["id"]) != fltr["id"], "Program IDs don't match")641    fail(prog["tag"] != fltr["tag"], "Program tags don't match")642    fail(fltr["id"] != dprog["id"], "Program IDs don't match")643    fail(dprog["state"] != "xlated", "Offloaded program state not translated")644    fail(dprog["loaded"] != "Y", "Offloaded program is not loaded")645    start_test("Test disabling TC offloads is rejected while filters installed...")646    ret, _ = sim.set_ethtool_tc_offloads(False, fail=False)647    fail(ret == 0, "Driver should refuse to disable TC offloads with filters installed...")648    start_test("Test qdisc removal frees things...")649    sim.tc_flush_filters()650    sim.tc_show_ingress(expected=0)651    start_test("Test disabling TC offloads is OK without filters...")652    ret, _ = sim.set_ethtool_tc_offloads(False, fail=False)653    fail(ret != 0,654         "Driver refused to disable TC offloads without filters installed...")655    sim.set_ethtool_tc_offloads(True)656    start_test("Test destroying device gets rid of TC filters...")657    sim.cls_bpf_add_filter(obj, skip_sw=True)658    sim.remove()659    bpftool_prog_list_wait(expected=0)660    sim = NetdevSim()661    sim.set_ethtool_tc_offloads(True)662    start_test("Test destroying device gets rid of XDP...")663    sim.set_xdp(obj, "offload")664    sim.remove()665    bpftool_prog_list_wait(expected=0)666    sim = NetdevSim()667    sim.set_ethtool_tc_offloads(True)668    start_test("Test XDP prog reporting...")669    sim.set_xdp(obj, "drv")670    ipl = sim.ip_link_show(xdp=True)671    progs = bpftool_prog_list(expected=1)672    fail(ipl["xdp"]["prog"]["id"] != progs[0]["id"],673         "Loaded program has wrong ID")674    start_test("Test XDP prog replace without force...")675    ret, _ = sim.set_xdp(obj, "drv", fail=False)676    fail(ret == 0, "Replaced XDP program without -force")677    sim.wait_for_flush(total=1)678    start_test("Test XDP prog replace with force...")679    ret, _ = sim.set_xdp(obj, "drv", force=True, fail=False)680    fail(ret != 0, "Could not replace XDP program with -force")681    bpftool_prog_list_wait(expected=1)682    ipl = sim.ip_link_show(xdp=True)683    progs = bpftool_prog_list(expected=1)684    fail(ipl["xdp"]["prog"]["id"] != progs[0]["id"],685         "Loaded program has wrong ID")686    fail("dev" in progs[0].keys(),687         "Device parameters reported for non-offloaded program")688    start_test("Test XDP prog replace with bad flags...")689    ret, _, err = sim.set_xdp(obj, "generic", force=True,690                              fail=False, include_stderr=True)691    fail(ret == 0, "Replaced XDP program with a program in different mode")692    fail(err.count("File exists") != 1, "Replaced driver XDP with generic")693    ret, _, err = sim.set_xdp(obj, "", force=True,694                              fail=False, include_stderr=True)695    fail(ret == 0, "Replaced XDP program with a program in different mode")696    check_extack(err, "program loaded with different flags.", args)697    start_test("Test XDP prog remove with bad flags...")698    ret, _, err = sim.unset_xdp("", force=True,699                                fail=False, include_stderr=True)700    fail(ret == 0, "Removed program with a bad mode")701    check_extack(err, "program loaded with different flags.", args)702    start_test("Test MTU restrictions...")703    ret, _ = sim.set_mtu(9000, fail=False)704    fail(ret == 0,705         "Driver should refuse to increase MTU to 9000 with XDP loaded...")706    sim.unset_xdp("drv")707    bpftool_prog_list_wait(expected=0)708    sim.set_mtu(9000)709    ret, _, err = sim.set_xdp(obj, "drv", fail=False, include_stderr=True)710    fail(ret == 0, "Driver should refuse to load program with MTU of 9000...")711    check_extack_nsim(err, "MTU too large w/ XDP enabled.", args)712    sim.set_mtu(1500)713    sim.wait_for_flush()714    start_test("Test non-offload XDP attaching to HW...")715    bpftool_prog_load("sample_ret0.o", "/sys/fs/bpf/nooffload")716    nooffload = bpf_pinned("/sys/fs/bpf/nooffload")717    ret, _, err = sim.set_xdp(nooffload, "offload",718                              fail=False, include_stderr=True)719    fail(ret == 0, "attached non-offloaded XDP program to HW")720    check_extack_nsim(err, "xdpoffload of non-bound program.", args)721    rm("/sys/fs/bpf/nooffload")722    start_test("Test offload XDP attaching to drv...")723    bpftool_prog_load("sample_ret0.o", "/sys/fs/bpf/offload",724                      dev=sim['ifname'])725    offload = bpf_pinned("/sys/fs/bpf/offload")726    ret, _, err = sim.set_xdp(offload, "drv", fail=False, include_stderr=True)727    fail(ret == 0, "attached offloaded XDP program to drv")728    check_extack(err, "using device-bound program without HW_MODE flag is not supported.", args)729    rm("/sys/fs/bpf/offload")730    sim.wait_for_flush()731    start_test("Test XDP offload...")732    _, _, err = sim.set_xdp(obj, "offload", verbose=True, include_stderr=True)733    ipl = sim.ip_link_show(xdp=True)734    link_xdp = ipl["xdp"]["prog"]735    progs = bpftool_prog_list(expected=1)736    prog = progs[0]737    fail(link_xdp["id"] != prog["id"], "Loaded program has wrong ID")738    check_verifier_log(err, "[netdevsim] Hello from netdevsim!")739    start_test("Test XDP offload is device bound...")740    dfs = sim.dfs_get_bound_progs(expected=1)741    dprog = dfs[0]742    fail(prog["id"] != link_xdp["id"], "Program IDs don't match")743    fail(prog["tag"] != link_xdp["tag"], "Program tags don't match")744    fail(str(link_xdp["id"]) != dprog["id"], "Program IDs don't match")745    fail(dprog["state"] != "xlated", "Offloaded program state not translated")746    fail(dprog["loaded"] != "Y", "Offloaded program is not loaded")747    start_test("Test removing XDP program many times...")748    sim.unset_xdp("offload")749    sim.unset_xdp("offload")750    sim.unset_xdp("drv")751    sim.unset_xdp("drv")752    sim.unset_xdp("")753    sim.unset_xdp("")754    bpftool_prog_list_wait(expected=0)755    start_test("Test attempt to use a program for a wrong device...")756    sim2 = NetdevSim()757    sim2.set_xdp(obj, "offload")758    pin_file, pinned = pin_prog("/sys/fs/bpf/tmp")759    ret, _, err = sim.set_xdp(pinned, "offload",760                              fail=False, include_stderr=True)761    fail(ret == 0, "Pinned program loaded for a different device accepted")762    check_extack_nsim(err, "program bound to different dev.", args)763    sim2.remove()764    ret, _, err = sim.set_xdp(pinned, "offload",765                              fail=False, include_stderr=True)766    fail(ret == 0, "Pinned program loaded for a removed device accepted")767    check_extack_nsim(err, "xdpoffload of non-bound program.", args)768    rm(pin_file)769    bpftool_prog_list_wait(expected=0)770    start_test("Test multi-attachment XDP - attach...")771    sim.set_xdp(obj, "offload")772    xdp = sim.ip_link_show(xdp=True)["xdp"]773    offloaded = sim.dfs_read("bpf_offloaded_id")774    fail("prog" not in xdp, "Base program not reported in single program mode")775    fail(len(ipl["xdp"]["attached"]) != 1,776         "Wrong attached program count with one program")777    sim.set_xdp(obj, "")778    two_xdps = sim.ip_link_show(xdp=True)["xdp"]779    offloaded2 = sim.dfs_read("bpf_offloaded_id")780    fail(two_xdps["mode"] != 4, "Bad mode reported with multiple programs")781    fail("prog" in two_xdps, "Base program reported in multi program mode")782    fail(xdp["attached"][0] not in two_xdps["attached"],783         "Offload program not reported after driver activated")784    fail(len(two_xdps["attached"]) != 2,785         "Wrong attached program count with two programs")786    fail(two_xdps["attached"][0]["prog"]["id"] ==787         two_xdps["attached"][1]["prog"]["id"],788         "offloaded and drv programs have the same id")789    fail(offloaded != offloaded2,790         "offload ID changed after loading driver program")791    start_test("Test multi-attachment XDP - replace...")792    ret, _, err = sim.set_xdp(obj, "offload", fail=False, include_stderr=True)793    fail(err.count("busy") != 1, "Replaced one of programs without -force")794    start_test("Test multi-attachment XDP - detach...")795    ret, _, err = sim.unset_xdp("drv", force=True,796                                fail=False, include_stderr=True)797    fail(ret == 0, "Removed program with a bad mode")798    check_extack(err, "program loaded with different flags.", args)799    sim.unset_xdp("offload")800    xdp = sim.ip_link_show(xdp=True)["xdp"]801    offloaded = sim.dfs_read("bpf_offloaded_id")802    fail(xdp["mode"] != 1, "Bad mode reported after multiple programs")803    fail("prog" not in xdp,804         "Base program not reported after multi program mode")805    fail(xdp["attached"][0] not in two_xdps["attached"],806         "Offload program not reported after driver activated")807    fail(len(ipl["xdp"]["attached"]) != 1,808         "Wrong attached program count with remaining programs")809    fail(offloaded != "0", "offload ID reported with only driver program left")810    start_test("Test multi-attachment XDP - device remove...")811    sim.set_xdp(obj, "offload")812    sim.remove()813    sim = NetdevSim()814    sim.set_ethtool_tc_offloads(True)815    start_test("Test mixing of TC and XDP...")816    sim.tc_add_ingress()817    sim.set_xdp(obj, "offload")818    ret, _, err = sim.cls_bpf_add_filter(obj, skip_sw=True,819                                         fail=False, include_stderr=True)820    fail(ret == 0, "Loading TC when XDP active should fail")821    check_extack_nsim(err, "driver and netdev offload states mismatch.", args)822    sim.unset_xdp("offload")823    sim.wait_for_flush()824    sim.cls_bpf_add_filter(obj, skip_sw=True)825    ret, _, err = sim.set_xdp(obj, "offload", fail=False, include_stderr=True)826    fail(ret == 0, "Loading XDP when TC active should fail")827    check_extack_nsim(err, "TC program is already loaded.", args)828    start_test("Test binding TC from pinned...")829    pin_file, pinned = pin_prog("/sys/fs/bpf/tmp")830    sim.tc_flush_filters(bound=1, total=1)831    sim.cls_bpf_add_filter(pinned, da=True, skip_sw=True)832    sim.tc_flush_filters(bound=1, total=1)833    start_test("Test binding XDP from pinned...")834    sim.set_xdp(obj, "offload")835    pin_file, pinned = pin_prog("/sys/fs/bpf/tmp2", idx=1)836    sim.set_xdp(pinned, "offload", force=True)837    sim.unset_xdp("offload")838    sim.set_xdp(pinned, "offload", force=True)839    sim.unset_xdp("offload")840    start_test("Test offload of wrong type fails...")841    ret, _ = sim.cls_bpf_add_filter(pinned, da=True, skip_sw=True, fail=False)842    fail(ret == 0, "Managed to attach XDP program to TC")843    start_test("Test asking for TC offload of two filters...")844    sim.cls_bpf_add_filter(obj, da=True, skip_sw=True)845    ret, _, err = sim.cls_bpf_add_filter(obj, da=True, skip_sw=True,846                                         fail=False, include_stderr=True)847    fail(ret == 0, "Managed to offload two TC filters at the same time")848    check_extack_nsim(err, "driver and netdev offload states mismatch.", args)849    sim.tc_flush_filters(bound=2, total=2)850    start_test("Test if netdev removal waits for translation...")851    delay_msec = 500852    sim.dfs["bpf_bind_verifier_delay"] = delay_msec853    start = time.time()854    cmd_line = "tc filter add dev %s ingress bpf %s da skip_sw" % \855               (sim['ifname'], obj)856    tc_proc = cmd(cmd_line, background=True, fail=False)857    # Wait for the verifier to start858    while sim.dfs_num_bound_progs() <= 2:859        pass860    sim.remove()861    end = time.time()862    ret, _ = cmd_result(tc_proc, fail=False)863    time_diff = end - start864    log("Time", "start:\t%s\nend:\t%s\ndiff:\t%s" % (start, end, time_diff))865    fail(ret == 0, "Managed to load TC filter on a unregistering device")866    delay_sec = delay_msec * 0.001867    fail(time_diff < delay_sec, "Removal process took %s, expected %s" %868         (time_diff, delay_sec))869    # Remove all pinned files and reinstantiate the netdev870    clean_up()871    bpftool_prog_list_wait(expected=0)872    sim = NetdevSim()873    map_obj = bpf_obj("sample_map_ret0.o")874    start_test("Test loading program with maps...")875    sim.set_xdp(map_obj, "offload", JSON=False) # map fixup msg breaks JSON876    start_test("Test bpftool bound info reporting (own ns)...")877    check_dev_info(False, "")878    start_test("Test bpftool bound info reporting (other ns)...")879    ns = mknetns()880    sim.set_ns(ns)881    check_dev_info(True, "")882    start_test("Test bpftool bound info reporting (remote ns)...")883    check_dev_info(False, ns)884    start_test("Test bpftool bound info reporting (back to own ns)...")885    sim.set_ns("")886    check_dev_info(False, "")887    prog_file, _ = pin_prog("/sys/fs/bpf/tmp_prog")888    map_file, _ = pin_map("/sys/fs/bpf/tmp_map", idx=1, expected=2)889    sim.remove()890    start_test("Test bpftool bound info reporting (removed dev)...")891    check_dev_info_removed(prog_file=prog_file, map_file=map_file)892    # Remove all pinned files and reinstantiate the netdev893    clean_up()894    bpftool_prog_list_wait(expected=0)895    sim = NetdevSim()896    start_test("Test map update (no flags)...")897    sim.set_xdp(map_obj, "offload", JSON=False) # map fixup msg breaks JSON898    maps = bpftool_map_list(expected=2)899    array = maps[0] if maps[0]["type"] == "array" else maps[1]900    htab = maps[0] if maps[0]["type"] == "hash" else maps[1]901    for m in maps:902        for i in range(2):903            bpftool("map update id %d key %s value %s" %904                    (m["id"], int2str("I", i), int2str("Q", i * 3)))905    for m in maps:906        ret, _ = bpftool("map update id %d key %s value %s" %907                         (m["id"], int2str("I", 3), int2str("Q", 3 * 3)),908                         fail=False)909        fail(ret == 0, "added too many entries")910    start_test("Test map update (exists)...")911    for m in maps:912        for i in range(2):913            bpftool("map update id %d key %s value %s exist" %914                    (m["id"], int2str("I", i), int2str("Q", i * 3)))915    for m in maps:916        ret, err = bpftool("map update id %d key %s value %s exist" %917                           (m["id"], int2str("I", 3), int2str("Q", 3 * 3)),918                           fail=False)919        fail(ret == 0, "updated non-existing key")920        fail(err["error"].find("No such file or directory") == -1,921             "expected ENOENT, error is '%s'" % (err["error"]))922    start_test("Test map update (noexist)...")923    for m in maps:924        for i in range(2):925            ret, err = bpftool("map update id %d key %s value %s noexist" %926                               (m["id"], int2str("I", i), int2str("Q", i * 3)),927                               fail=False)928        fail(ret == 0, "updated existing key")929        fail(err["error"].find("File exists") == -1,930             "expected EEXIST, error is '%s'" % (err["error"]))931    start_test("Test map dump...")932    for m in maps:933        _, entries = bpftool("map dump id %d" % (m["id"]))934        for i in range(2):935            key = str2int(entries[i]["key"])936            fail(key != i, "expected key %d, got %d" % (key, i))937            val = str2int(entries[i]["value"])938            fail(val != i * 3, "expected value %d, got %d" % (val, i * 3))939    start_test("Test map getnext...")940    for m in maps:941        _, entry = bpftool("map getnext id %d" % (m["id"]))942        key = str2int(entry["next_key"])943        fail(key != 0, "next key %d, expected %d" % (key, 0))944        _, entry = bpftool("map getnext id %d key %s" %945                           (m["id"], int2str("I", 0)))946        key = str2int(entry["next_key"])947        fail(key != 1, "next key %d, expected %d" % (key, 1))948        ret, err = bpftool("map getnext id %d key %s" %949                           (m["id"], int2str("I", 1)), fail=False)950        fail(ret == 0, "got next key past the end of map")951        fail(err["error"].find("No such file or directory") == -1,952             "expected ENOENT, error is '%s'" % (err["error"]))953    start_test("Test map delete (htab)...")954    for i in range(2):955        bpftool("map delete id %d key %s" % (htab["id"], int2str("I", i)))956    start_test("Test map delete (array)...")957    for i in range(2):958        ret, err = bpftool("map delete id %d key %s" %959                           (htab["id"], int2str("I", i)), fail=False)960        fail(ret == 0, "removed entry from an array")961        fail(err["error"].find("No such file or directory") == -1,962             "expected ENOENT, error is '%s'" % (err["error"]))963    start_test("Test map remove...")964    sim.unset_xdp("offload")965    bpftool_map_list_wait(expected=0)966    sim.remove()967    sim = NetdevSim()968    sim.set_xdp(map_obj, "offload", JSON=False) # map fixup msg breaks JSON969    sim.remove()970    bpftool_map_list_wait(expected=0)971    start_test("Test map creation fail path...")972    sim = NetdevSim()973    sim.dfs["bpf_map_accept"] = "N"974    ret, _ = sim.set_xdp(map_obj, "offload", JSON=False, fail=False)975    fail(ret == 0,976         "netdevsim didn't refuse to create a map with offload disabled")977    sim.remove()978    start_test("Test multi-dev ASIC program reuse...")979    simA = NetdevSim()980    simB1 = NetdevSim()981    simB2 = NetdevSim(link=simB1)982    simB3 = NetdevSim(link=simB1)983    sims = (simA, simB1, simB2, simB3)984    simB = (simB1, simB2, simB3)985    bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimA",986                      dev=simA['ifname'])987    progA = bpf_pinned("/sys/fs/bpf/nsimA")988    bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimB",989                      dev=simB1['ifname'])990    progB = bpf_pinned("/sys/fs/bpf/nsimB")991    simA.set_xdp(progA, "offload", JSON=False)992    for d in simB:993        d.set_xdp(progB, "offload", JSON=False)994    start_test("Test multi-dev ASIC cross-dev replace...")995    ret, _ = simA.set_xdp(progB, "offload", force=True, JSON=False, fail=False)996    fail(ret == 0, "cross-ASIC program allowed")997    for d in simB:998        ret, _ = d.set_xdp(progA, "offload", force=True, JSON=False, fail=False)999        fail(ret == 0, "cross-ASIC program allowed")1000    start_test("Test multi-dev ASIC cross-dev install...")1001    for d in sims:1002        d.unset_xdp("offload")1003    ret, _, err = simA.set_xdp(progB, "offload", force=True, JSON=False,1004                               fail=False, include_stderr=True)1005    fail(ret == 0, "cross-ASIC program allowed")1006    check_extack_nsim(err, "program bound to different dev.", args)1007    for d in simB:1008        ret, _, err = d.set_xdp(progA, "offload", force=True, JSON=False,1009                                fail=False, include_stderr=True)1010        fail(ret == 0, "cross-ASIC program allowed")1011        check_extack_nsim(err, "program bound to different dev.", args)1012    start_test("Test multi-dev ASIC cross-dev map reuse...")1013    mapA = bpftool("prog show %s" % (progA))[1]["map_ids"][0]1014    mapB = bpftool("prog show %s" % (progB))[1]["map_ids"][0]1015    ret, _ = bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimB_",1016                               dev=simB3['ifname'],1017                               maps=["idx 0 id %d" % (mapB)],1018                               fail=False)1019    fail(ret != 0, "couldn't reuse a map on the same ASIC")1020    rm("/sys/fs/bpf/nsimB_")1021    ret, _, err = bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimA_",1022                                    dev=simA['ifname'],1023                                    maps=["idx 0 id %d" % (mapB)],1024                                    fail=False, include_stderr=True)1025    fail(ret == 0, "could reuse a map on a different ASIC")1026    fail(err.count("offload device mismatch between prog and map") == 0,1027         "error message missing for cross-ASIC map")1028    ret, _, err = bpftool_prog_load("sample_map_ret0.o", "/sys/fs/bpf/nsimB_",1029                                    dev=simB1['ifname'],1030                                    maps=["idx 0 id %d" % (mapA)],1031                                    fail=False, include_stderr=True)1032    fail(ret == 0, "could reuse a map on a different ASIC")1033    fail(err.count("offload device mismatch between prog and map") == 0,1034         "error message missing for cross-ASIC map")1035    start_test("Test multi-dev ASIC cross-dev destruction...")1036    bpftool_prog_list_wait(expected=2)1037    simA.remove()1038    bpftool_prog_list_wait(expected=1)1039    ifnameB = bpftool("prog show %s" % (progB))[1]["dev"]["ifname"]1040    fail(ifnameB != simB1['ifname'], "program not bound to originial device")1041    simB1.remove()1042    bpftool_prog_list_wait(expected=1)1043    start_test("Test multi-dev ASIC cross-dev destruction - move...")1044    ifnameB = bpftool("prog show %s" % (progB))[1]["dev"]["ifname"]1045    fail(ifnameB not in (simB2['ifname'], simB3['ifname']),1046         "program not bound to remaining devices")1047    simB2.remove()1048    ifnameB = bpftool("prog show %s" % (progB))[1]["dev"]["ifname"]1049    fail(ifnameB != simB3['ifname'], "program not bound to remaining device")1050    simB3.remove()1051    bpftool_prog_list_wait(expected=0)1052    start_test("Test multi-dev ASIC cross-dev destruction - orphaned...")1053    ret, out = bpftool("prog show %s" % (progB), fail=False)1054    fail(ret == 0, "got information about orphaned program")1055    fail("error" not in out, "no error reported for get info on orphaned")1056    fail(out["error"] != "can't get prog info: No such device",1057         "wrong error for get info on orphaned")1058    print("%s: OK" % (os.path.basename(__file__)))1059finally:1060    log("Clean up...", "", level=1)1061    log_level_inc()...validation.py
Source:validation.py  
...11# ---- STAGE slide_is_possible -----12def tests_slide_is_possible(test_slide):13    vt.STAGE_SLIDE_IS_OVER = test_slide14    #15    vt.start_test('slide impossible with empty line')16    vt.test_slide_is_possible([0, 0, 0, 0], False)17    #18    vt.start_test('slide impossible with full line')19    vt.test_slide_is_possible([1, 2, 1, 2], False)20    #21    vt.start_test('slide impossible with almost full line')22    vt.test_slide_is_possible([3, 2, 3, 0], False)23    #24    vt.start_test('slide impossible with two tiles')25    vt.test_slide_is_possible([1, 3, 0, 0], False)26    #27    vt.start_test('slide impossible with one tile')28    vt.test_slide_is_possible([2, 0, 0, 0], False)29    #30    vt.start_test('slide possible with one tile (pos 1)')31    vt.test_slide_is_possible([0, 1, 0, 0], True)32    #33    vt.start_test('slide with one tile (pos 2)')34    vt.test_slide_is_possible([0, 0, 2, 0], True)35    #36    vt.start_test('slide with one tile (pos 3)')37    vt.test_slide_is_possible([0, 0, 0, 3], True)38    #39    vt.start_test('slide with two different tiles v1')40    vt.test_slide_is_possible([0, 2, 0, 3], True)41    #42    vt.start_test('slide with two different tiles v2')43    vt.test_slide_is_possible([3, 0, 2, 0], True)44    #45    vt.start_test('slide with two different tiles v3')46    vt.test_slide_is_possible([1, 0, 0, 3], True)47    #48    vt.start_test('slide with three different tiles v1')49    vt.test_slide_is_possible([1, 2, 0, 1], True)50    #51    vt.start_test('slide with three different tiles v2')52    vt.test_slide_is_possible([2, 0, 1, 2], True)53    #54    vt.start_test('slide with two identical consecutive tiles v1')55    vt.test_slide_is_possible([1, 1, 0, 0], True)56    #57    vt.start_test('slide with two identical consecutive tiles v2')58    vt.test_slide_is_possible([2, 1, 1, 0], True)59    #60    vt.start_test('slide with two identical consecutive tiles v3')61    vt.test_slide_is_possible([1, 2, 3, 3], True)62tests_slide_is_possible(False)63vt.valid_current_stage("rules.slide_is_possible")64    65# ---- STAGE move_dir_possible + game_over -----66def test_move_dir_possible():67    for d in rules.DIRECTIONS:68        vt.start_test('impossible move dir on ' + rules.DIR_NAME[d] + ' (empty)')69        vt.test_move_dir_possible(d, rules.EMPTYBOARD, False)70        #71    empty1_board = [72        [1, 2, 3, 4],73        [5, 6, 7, 8],74        [9, 10, 11, 12],75        [0, 14, 15, 16],76    ]77    #78    for d in (rules.RIGHT, rules.UP):79        vt.start_test('impossible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')80        vt.test_move_dir_possible(d, empty1_board, False)81    #82    for d in (rules.LEFT, rules.DOWN):83        vt.start_test('possible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')84        vt.test_move_dir_possible(d, empty1_board, True)85    #86    empty1_board = vt.perm_dir(rules.UP, empty1_board)87    #88    for d in (rules.RIGHT, rules.UP):89        vt.start_test('possible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')90        vt.test_move_dir_possible(d, empty1_board, True)91    #92    for d in (rules.LEFT, rules.DOWN):93        vt.start_test('impossible move dir on ' + rules.DIR_NAME[d] + ' (empty1)')94        vt.test_move_dir_possible(d, empty1_board, False)95    #96    for d in (rules.UP, rules.DOWN):97        vt.start_test('impossible move dir on ' +98                      rules.DIR_NAME[d] + ' (XFULLBOARD)')99        vt.test_move_dir_possible(d, rules.XFULLBOARD, False)100    #101    for d in (rules.LEFT, rules.RIGHT):102        vt.start_test('possible move dir on ' +103                      rules.DIR_NAME[d] + ' (XFULLBOARD)')104        vt.test_move_dir_possible(d, rules.XFULLBOARD, True)105    #106    myboard = vt.perm_dir(rules.DOWN, rules.XFULLBOARD)107    #108    for d in (rules.UP, rules.DOWN):109        vt.start_test('possible move dir on ' +110                      rules.DIR_NAME[d] + ' (XFULLBOARD)')111        vt.test_move_dir_possible(d, myboard, True)112    #113    for d in (rules.LEFT, rules.RIGHT):114        vt.start_test('impossible move dir on ' +115                      rules.DIR_NAME[d] + ' (XFULLBOARD)')116        vt.test_move_dir_possible(d, myboard, False)117test_move_dir_possible()118    119vt.start_test('game.over')120assert rules.game_over(rules.FULLBOARD)121vt.valid_current_stage("move_dir_possible + game_over")122# ---- STAGE slide -----123vt.start_test('slide with zero tile')124vt.test_slide(125    [0, 0, 0, 0],126    [0, 0, 0, 0])127vt.start_test('slide with one tile (pos 0)')128vt.test_slide(129    [4, 0, 0, 0],130    [4, 0, 0, 0])131vt.start_test('slide with one tile (pos 1)')132vt.test_slide(133    [0, 1, 0, 0],134    [1, 0, 0, 0])135vt.start_test('slide with one tile (pos 2)')136vt.test_slide(137    [0, 0, 2, 0],138    [2, 0, 0, 0])139vt.start_test('slide with one tile (pos 3)')140vt.test_slide(141    [0, 0, 0, 3],142    [3, 0, 0, 0])143vt.start_test('slide with two different tiles at bottom')144vt.test_slide(145    [1, 2, 0, 0],146    [1, 2, 0, 0])147vt.start_test('slide with two different tiles v1')148vt.test_slide(149    [0, 2, 0, 3],150    [2, 3, 0, 0])151vt.start_test('slide with two different tiles v2')152vt.test_slide(153    [3, 0, 0, 2],154    [3, 2, 0, 0])155vt.start_test('slide with two identical tiles')156vt.test_slide(157    [0, 1, 0, 1],158    [2, 0, 0, 0])159vt.start_test('slide with two identical consecutive tiles')160vt.test_slide(161    [0, 3, 3, 0],162    [4, 0, 0, 0])163vt.start_test('slide with two identical consecutive tiles at bottom')164vt.test_slide(165    [2, 2, 0, 0],166    [3, 0, 0, 0])167vt.start_test('slide with 3 mixed tiles')168vt.test_slide(169    [2, 1, 0, 2],170    [2, 1, 2, 0])171vt.start_test('slide with 3 identical consecutive tiles')172vt.test_slide(173    [4, 4, 4, 0],174    [5, 4, 0, 0])175vt.start_test('slide with 3 identical non-consecutive tiles')176vt.test_slide(177    [4, 4, 0, 4],178    [5, 4, 0, 0])179vt.start_test('slide with 3 identical tiles ')180vt.test_slide(181    [4, 0, 4, 4],182    [5, 4, 0, 0])183vt.start_test('slide with 2 identical tiles and 1 greater')184vt.test_slide(185    [4, 4, 5, 0],186    [5, 5, 0, 0])187vt.start_test('slide with 1 greater and 2 identical tiles')188vt.test_slide(189    [5, 4, 0, 4],190    [5, 5, 0, 0])191vt.start_test('slide with 4 identical tiles')192vt.test_slide(193    [6, 6, 6, 6],194    [7, 7, 0, 0])195tests_slide_is_possible(True)196vt.valid_current_stage("rules.slide")197# ---- STAGE MOVE_DIR -----198simple_board = [199    [0, 0, 0, 0],200    [0, 2, 0, 0],201    [0, 0, 3, 0],202    [0, 0, 0, 0],203]204vt.start_test('Simple rules.move_dir left')205vt.test_move_dir(rules.LEFT, simple_board, [206    [0, 0, 0, 0],207    [2, 0, 0, 0],208    [3, 0, 0, 0],209    [0, 0, 0, 0],210])211vt.start_test('Simple rules.move_dir right')212vt.test_move_dir(rules.RIGHT, simple_board, [213    [0, 0, 0, 0],214    [0, 0, 0, 2],215    [0, 0, 0, 3],216    [0, 0, 0, 0],217])218vt.start_test('Simple rules.move_dir up')219vt.test_move_dir(rules.UP, simple_board, [220    [0, 2, 3, 0],221    [0, 0, 0, 0],222    [0, 0, 0, 0],223    [0, 0, 0, 0],224])225vt.start_test('Simple rules.move_dir down')226vt.test_move_dir(rules.DOWN, simple_board, [227    [0, 0, 0, 0],228    [0, 0, 0, 0],229    [0, 0, 0, 0],230    [0, 2, 3, 0],231])232complex_board= [233    [2, 4, 2, 3], 234    [0, 0, 0, 0], 235    [0, 0, 0, 0], 236    [0, 0, 2, 0]237]238vt.start_test('Complex rules.move_dir left')239vt.test_move_dir(rules.LEFT, complex_board, [240    [2, 4, 2, 3], 241    [0, 0, 0, 0], 242    [0, 0, 0, 0], 243    [2, 0, 0, 0]244])245vt.start_test('Complex rules.move_dir right')246vt.test_move_dir(rules.RIGHT, complex_board, [247    [2, 4, 2, 3], 248    [0, 0, 0, 0], 249    [0, 0, 0, 0], 250    [0, 0, 0, 2]251])252vt.start_test('Complex rules.move_dir down')253vt.test_move_dir(rules.DOWN, complex_board, [254    [0, 0, 0, 0], 255    [0, 0, 0, 0], 256    [0, 0, 0, 0], 257    [2, 4, 3, 3]258])259vt.start_test('Complex rules.move_dir up')260vt.test_move_dir(rules.UP, complex_board, [261    [2, 4, 3, 3],262    [0, 0, 0, 0], 263    [0, 0, 0, 0], 264    [0, 0, 0, 0]265])266complex_board = [267    [2, 2, 2, 1],268    [2, 2, 2, 0],269    [1, 2, 1, 1],270    [1, 0, 0, 0],271]272vt.start_test('Complex rules.move_dir left (2)')273vt.test_move_dir(rules.LEFT, complex_board, [274    [3, 2, 1, 0],275    [3, 2, 0, 0],276    [1, 2, 2, 0],277    [1, 0, 0, 0],278])279vt.start_test('Complex rules.move_dir right (2)')280vt.test_move_dir(rules.RIGHT, complex_board, [281    [0, 2, 3, 1],282    [0, 0, 2, 3],283    [0, 1, 2, 2],284    [0, 0, 0, 1],285])286vt.start_test('Complex rules.move_dir down (2)')287vt.test_move_dir(rules.DOWN, complex_board, [288    [0, 0, 0, 0],289    [0, 0, 0, 0],290    [3, 2, 3, 0],291    [2, 3, 1, 2],292])293vt.start_test('Complex rules.move_dir up (2)')294vt.test_move_dir(rules.UP, complex_board, [295    [3, 3, 3, 2],296    [2, 2, 1, 0],297    [0, 0, 0, 0],298    [0, 0, 0, 0],299])300test_move_dir_possible()301vt.valid_current_stage("rules.move_dir")302# ---- STAGE MEAN-SCORE -----303vt.start_test('mean_score.game_direction_first on FULLBOARD')304vt.test_game_direction_first(rules.FULLBOARD, (65536, 0))305vt.start_test('mean_score.game_direction_first on XFULLBOARD')306vt.test_game_direction_first(rules.XFULLBOARD, (64, 4))307vt.start_test('mean_score.game_direction_first on STEP0')308vt.test_game_direction_first(rules.STEP0, (512, 334))309vt.start_test('mean_score.game_direction_first torture DIR')310vt.torture_game_direction_first(vt.play_DOWN,311                                players.first_tile,312                                rules.XFULLBOARD)313vt.start_test('mean_score.game_direction_first torture TILE')314vt.torture_game_direction_first(players.first_direction,315                                vt.play_FIRST,316                                rules.XFULLBOARD)317vt.start_test('mean_score.game_direction_first torture None')318vt.torture_game_direction_first(vt.play_None,319                                players.first_tile,320                                rules.STEP0)321vt.start_test('mean_score.game_tile_first on FULLBOARD')322vt.test_game_tile_first(rules.FULLBOARD, (65536, 0))323vt.start_test('mean_score.game_tile_first on XFULLBOARD')324vt.test_game_tile_first(rules.XFULLBOARD, (32, 0))325vt.start_test('mean_score.game_tile_first on STEP0')326vt.test_game_tile_first(rules.STEP0, (512, 334))327vt.start_test('mean_score.game_tile_first on EMPTYBOARD')328vt.test_game_tile_first(rules.EMPTYBOARD, (512, 337))329myboard = [330    [1, 2, 3, 4],331    [2, 3, 4, 5],332    [3, 4, 3, 2],333    [4, 0, 5, 5],334]335vt.start_test('mean_score.game_tile_first torture DIR')336vt.torture_game_tile_first(vt.play_DOWN,337                           players.first_tile,338                           myboard)339vt.start_test('mean_score.game_tile_first torture TILE')340vt.torture_game_tile_first(players.first_direction,341                           vt.play_FIRST,342                           myboard)343vt.valid_current_stage("mean_score.games")344# ---- STAGE LEVEL -----345vt.start_test('rules.level EMPTYBOARD')346vt.test_invperm(rules.level, rules.EMPTYBOARD, 0)347vt.start_test('rules.level basic corner 0')348vt.test_invperm(rules.level, [349    [1, 0, 0, 0],350    [0, 0, 1, 0],351    [0, 1, 0, 0],352    [0, 0, 0, 0]353], 3)354vt.start_test('rules.level basic corner 1')355vt.test_invperm(rules.level, [356    [0, 1, 1, 0],357    [0, 1, 0, 0],358    [0, 0, 0, 0],359    [0, 0, 0, 0]360], 10)361vt.start_test('rules.level basic corner 2')362vt.test_invperm(rules.level, [363    [0, 0, 0, 0],364    [0, 1, 1, 0],365    [0, 1, 0, 0],366    [0, 0, 0, 0]367], 10)368vt.start_test('rules.level basic corner 3')369vt.test_invperm(rules.level, [370    [0, 0, 1, 1],371    [0, 0, 0, 1],372    [0, 0, 0, 0],373    [0, 0, 0, 0]374], 10)375vt.start_test('rules.level basic corner 4')376vt.test_invperm(rules.level, [377    [1, 1, 0, 0],378    [0, 0, 0, 0],379    [1, 0, 0, 0],380    [0, 0, 0, 0]381], 10)382vt.start_test('rules.level line 0')383vt.test_invperm(rules.level, [384    [0, 0, 0, 1],385    [0, 0, 1, 0],386    [0, 1, 0, 0],387    [1, 0, 0, 0]388], 4)389vt.start_test('rules.level line 1')390vt.test_invperm(rules.level, [391    [0, 1, 1, 1],392    [0, 0, 0, 0],393    [0, 0, 0, 0],394    [0, 0, 0, 0]395], 10)396vt.start_test('rules.level line 2')397vt.test_invperm(rules.level, [398    [0, 1, 1, 1],399    [0, 0, 0, 0],400    [0, 0, 0, 0],401    [1, 0, 0, 0]402], 11)403vt.start_test('rules.level line 3')404vt.test_invperm(rules.level, [405    [0, 1, 1, 1],406    [0, 0, 0, 0],407    [0, 0, 0, 0],408    [0, 0, 0, 1]409], 11)410vt.start_test('rules.level line 4')411vt.test_invperm(rules.level, [412    [1, 1, 1, 1],413    [0, 0, 0, 0],414    [0, 0, 0, 0],415    [0, 0, 0, 1]416], 19)417vt.start_test('rules.level line 5')418vt.test_invperm(rules.level, [419    [1, 1, 1, 1],420    [1, 0, 0, 1],421    [0, 0, 0, 0],422    [0, 0, 0, 0]423], 20)424vt.start_test('rules.level line 6')425vt.test_invperm(rules.level, [426    [1, 1, 1, 1],427    [0, 0, 0, 0],428    [0, 0, 0, 0],429    [0, 0, 0, 2]430], 99)431vt.start_test('rules.level line 7')432vt.test_invperm(rules.level, [433    [2, 0, 0, 0],434    [0, 1, 1, 1],435    [0, 0, 0, 0],436    [1, 0, 0, 0]437], 92)438vt.start_test('rules.level line 8')439vt.test_invperm(rules.level, [440    [0, 0, 0, 0],441    [0, 1, 1, 0],442    [0, 1, 2, 1],443    [0, 0, 0, 0]444], 92)445vt.start_test('rules.level full 1')446vt.test_invperm(rules.level, [447    [1, 1, 1, 1],448    [1, 1, 1, 1],449    [1, 1, 1, 1],450    [1, 1, 1, 1]451], 72)452vt.start_test('rules.level semi-full 1')453vt.test_invperm(rules.level, [454    [1, 0, 1, 0],455    [1, 0, 1, 0],456    [0, 0, 1, 1],457    [0, 0, 1, 1]458], 36)459vt.start_test('rules.level semi-full 2')460vt.test_invperm(rules.level, [461    [1, 0, 1, 0],462    [0, 1, 0, 1],463    [1, 0, 1, 0],464    [0, 1, 0, 1]465], 8)466vt.start_test('rules.level semi-full 3')467vt.test_invperm(rules.level, [468    [2, 0, 1, 0],469    [2, 0, 1, 0],470    [0, 2, 2, 0],471    [0, 1, 1, 0]472], 902)473vt.start_test('rules.level STEP0')474vt.test_invperm(rules.level, rules.STEP0, 6642)475vt.start_test('rules.level XFULLBOARD')476vt.test_invperm(rules.level, rules.XFULLBOARD, 432619463)477vt.start_test('rules.level FINAL1')478vt.test_invperm(rules.level, rules.FINAL1, 3476604868046168890712585943456)479vt.valid_current_stage("rules.level")480# -------------------------...sarimax_script.py
Source:sarimax_script.py  
1import matplotlib.pyplot as plt2import pandas as pd3import numpy as np4import itertools5import statsmodels.api as sm6from statsmodels.tsa.stattools import adfuller7from datetime import datetime, timedelta8from dateutil.relativedelta import *9import statsmodels.tsa.api as smt10import seaborn as sns11from sklearn.metrics import mean_squared_error12import pickle13from script import *14#function to create training and testing set15def create_train_test(data, start_train, end_train, start_test, end_test, test_length=24):16    df_train = data.loc[start_train:end_train, :]17    df_test = data.loc[start_test:end_test, :]18    start = datetime.strptime(start_test, '%Y-%m-%d %H:%M:%S')19    date_list = [start + relativedelta(hours=x) for x in range(0,test_length)] #test set will always have 24 hours20    future = pd.DataFrame(index=date_list, columns= df_train.columns)21    df_train = pd.concat([df_train, future])22    return df_train, df_test23#function to add all exogenous variables24def add_exog(data, weather, start_time, end_time):25    #add dummy variables for precipitation26    precip = pd.get_dummies(weather.precip_type)27    data = data.join(precip)28    data['Day_of_Week'] = data.index.dayofweek29    data['Weekend'] = data.apply(is_weekend, axis=1)30    data['Temperature'] = weather.loc[start_time:end_time, 'temperature']31    data['Humidity'] = weather.loc[start_time:end_time, 'humidity']32    data['Precip_Intensity'] = weather.loc[start_time:end_time, 'precip_intensity']33    data.rename(columns={'rain':'Rain', 'sleet':'Sleet', 'snow':'Snow'}, inplace=True)34    #fill missing values with mean35    data['Temperature'] = data.Temperature.fillna(np.mean(data['Temperature']))36    data['Humidity'] = data.Humidity.fillna(np.mean(data['Humidity']))37    data['Precip_Intensity'] = data.Precip_Intensity.fillna(np.mean(data['Precip_Intensity']))38    return data39#function to start/end dates for train and test40def find_dates(building_id, length=1, total_length=30, final_date=None):41    start_train, end_test = find_egauge_dates(building_id, total_length)42    time_delta_1 = timedelta(days=length)43    time_delta_2 = timedelta(hours=1)44    end_train = end_test - time_delta_145    start_test = end_train + time_delta_246    start_train = str(start_train)47    end_train = str(end_train)48    start_test = str(start_test)49    end_test = str(end_test)50    return start_train, end_train, start_test, end_test51def fit_exog_arima(data, weather, building_id, length=1, total_length=30, test_length=24):52    start_train, end_train, start_test, end_test = find_dates(building_id, length=length, total_length=total_length)53    df_train, df_test = create_train_test(data, start_train, end_train, start_test, end_test, test_length)54    df_exog = add_exog(df_train, weather, start_train, end_test)55    exogenous = df_exog.loc[start_train:,['Weekend','Temperature','Humidity','car1']].astype(float)56    endogenous = df_exog.loc[:,'Hourly_Usage'].astype(float)57#    low_aic = gridsearch_arima(endogenous,exogenous)58#    arima_model = sm.tsa.statespace.SARIMAX(endog=endogenous,59#                                   exog = exogenous,60#                                   trend=None,61#                                   order=low_aic[0],62#                                   seasonal_order=low_aic[1],63#                                   enforce_stationarity=False,64#                                   enforce_invertibility=False)65    arima_model = sm.tsa.statespace.SARIMAX(endog=endogenous,66                                  exog = exogenous,67                                  trend=None,68                                  order=(1, 0, 1),69                                  seasonal_order=(0, 1, 1, 24),70                                  enforce_stationarity=False,71                                  enforce_invertibility=False)72    results = arima_model.fit()73    return df_exog, results74def plot_exog_arima(data, data_exog, model, building_id, length=1, total_length=30, test_length=24):75    start_train, end_train, start_test, end_test = find_dates(building_id, length=length, total_length=total_length)76    df_train, df_test = create_train_test(data, start_train, end_train, start_test, end_test, test_length=test_length)77    df_exog_train, df_exog_test = create_train_test(data_exog, start_train, end_train, start_test, end_test, test_length=test_length)78    mse, rmse = add_forecast(model, df_test, df_exog_train, start_test, end_test)79    plot_forecast(df_exog_train, 500)80    return mse, rmse, df_exog_train81#function to find optimal parameters and resulting AIC score82def gridsearch_arima(y, exog=None):83    p = d = q = range(0, 2)84    pdq = list(itertools.product(p, d, q))85    seasonal_pdq = [(x[0], x[1], x[2], 24) for x in list(itertools.product(p, d, q))]86    low_aic = [0,0,50000]87    for param in pdq:88        for param_seasonal in seasonal_pdq:89            try:90                model = sm.tsa.statespace.SARIMAX(y,91                                                  exog=exog,92                                                order=param,93                                                seasonal_order=param_seasonal,94                                                enforce_stationarity=False,95                                                enforce_invertibility=False)96                results = model.fit()97                if results.aic < low_aic[2]:98                    low_aic = [param, param_seasonal, results.aic]99#                 print('ARIMA{}x{}24 - AIC:{}'.format(param, param_seasonal, results.aic))100            except:101                continue102    return low_aic103#function to forecast with fitted model, returns MSE and RMSE104def add_forecast(model, test, train, start_time, end_time):105    train['forecast'] = model.predict(start=start_time, end=end_time)106    y_true = test.loc[start_time:end_time, 'Hourly_Usage']107    y_pred = train.loc[start_time:end_time, 'forecast']108    train.loc[start_time:end_time, 'Hourly_Usage'] = test.loc[start_time:end_time, 'Hourly_Usage']109    mse = mean_squared_error(y_true, y_pred)110    rmse = np.sqrt(mse)111    return mse, rmse112def plot_forecast(data, datapoints):113    fig = plt.figure(figsize=(16,8))114    plt.plot(data['Hourly_Usage'][datapoints:])115    plt.plot(data['forecast'])116    plt.legend()117#function to find mean car charge118def mean_car_charge(data, start, end):119    car_charge = {}120    for index in data.Time_Index.unique():121        car_charge[index] = np.mean(data[data.Time_Index==index].car1)122    return car_charge123#function to add all exogenous variables124def create_exog_endo(data, weather, building_id, length=1, total_length=30):125    start_train, end_train, start_test, end_test = find_dates(building_id, length, total_length)126    df_train, df_test = create_train_test(data, start_train, end_train, start_test, end_test, 24*length)127    car_charge = mean_car_charge(data, start_train,end_train)128    df_train['Time_Index'] = df_train.index.weekday_name+ df_train.index.hour.astype(str)129    df_train['Temperature'] = weather.loc[start_train:end_test, 'temperature']130    df_train['Humidity'] = weather.loc[start_train:end_test, 'humidity']131    for time in df_train.loc[start_test:end_test,:].index:132        df_train.loc[time,'car1'] = car_charge[df_train.loc[time,'Time_Index']]133    #fill missing values with mean134    df_train['Temperature'] = df_train.Temperature.fillna(np.mean(df_train['Temperature']))135    df_train['Humidity'] = df_train.Humidity.fillna(np.mean(df_train['Humidity']))136    exogenous = df_train.loc[start_train:,['Temperature','Humidity','car1']].astype(float)137    endogenous = df_train.loc[:,'Hourly_Usage'].astype(float)138    return df_train, exogenous, endogenous139#function to fit SARIMAX model with create_exog_endo140def fit_exog_arima_new(exogenous, endogenous):141    low_aic = gridsearch_arima(endogenous,exogenous)142    arima_model = sm.tsa.statespace.SARIMAX(endog=endogenous,143                                  exog = exogenous,144                                  trend=None,145                                  order=low_aic[0],146                                  seasonal_order=low_aic[1],147                                  enforce_stationarity=False,148                                  enforce_invertibility=False)149    arima_exog_results = arima_model.fit()...test_api.py
Source:test_api.py  
...36                    )37(options, args) = parser.parse_args()38server_url = options.server39data_dir = options.datadir40def start_test(x): print x41def testAddImage():42    print server_url43    print data_dir44    server = xmlrpclib.ServerProxy(server_url);45    assert server.createDb(1) == True46    start_test('add imgs')47    assert server.addImg(1, 7,data_dir+"DSC00007.JPG") == 148    assert server.addImg(1, 6,data_dir+"DSC00006.JPG") == 149    assert server.addImg(1, 14,data_dir+"DSC00014.JPG") == 150    assert server.addImg(1, 17,data_dir+"DSC00017.JPG") == 151    start_test('img count')52    assert server.getDbImgCount(1) == 453    start_test('image is on db')54    assert server.isImgOnDb(1,7) == True55    start_test('save db')56    assert server.saveAllDbs() > 057    start_test('reset db')58    assert server.resetDb(1) == 159    assert server.getDbImgCount(1) == 060    start_test('load db')61    assert server.loadAllDbs() > 062    assert server.getDbImgCount(1) == 463    assert server.isImgOnDb(1,7) == 164    assert server.isImgOnDb(1,733) == 065    start_test('remove img')66    assert server.removeImg(1,7) == 167    assert server.removeImg(1,73232) == 068    assert server.getDbImgCount(1) == 369    assert server.isImgOnDb(1,7) == 070    assert server.getDbImgIdList(1) == [6,14,17]71    start_test('list database spaces')72    assert 1 in server.getDbList()73    start_test('add more random images')74    fnames = [data_dir+"DSC00007.JPG",75              data_dir+"DSC00006.JPG",76              data_dir+"DSC00014.JPG",77              data_dir+"DSC00017.JPG"78              ]79    import random80    for i in range(20,60):81        assert server.addImg(1, i, random.choice(fnames)) == 182    start_test('add keywords')83    assert server.addKeywordImg(1,142,3) == False84    assert server.addKeywordImg(1,14,1) == True85    assert server.addKeywordImg(1,14,2) == True86    assert server.addKeywordImg(1,14,3) == True87    assert server.addKeywordImg(1,14,4) == True88    assert server.addKeywordImg(1,17,3) == True89    assert server.addKeywordImg(1,21,3) == True90    assert server.addKeywordImg(1,22,5) == True91    start_test('get keywords')92    assert server.getKeywordsImg(1,14) == [1,2,3,4]93    assert server.getKeywordsImg(1,17) == [3]94    assert server.getKeywordsImg(1,21) == [3]95    assert server.getKeywordsImg(1,20) == []96    start_test('remove keywords')97    assert server.removeAllKeywordImg(1,17) == True98    assert server.getKeywordsImg(1,17) == []99    start_test('save db')100    assert server.saveAllDbs() > 0101    start_test('reset db')102    assert server.resetDb(1) == 1103    assert server.getDbImgCount(1) == 0104    start_test('load db')105    assert server.loadAllDbs() > 0106    assert server.getDbImgCount(1) == 43107    start_test('get keywords')108    assert server.getKeywordsImg(1,14) == [1,2,3,4]109    start_test('query by a keyword')110    # 3: 14, 17, 21111    # 4: 14112    # 5: 22113    res = server.getAllImgsByKeywords(1, 30, 1, '3')114    assert 14 in res115    assert 17 in res116    assert 21 in res117    res = server.getAllImgsByKeywords(1, 30, 0, '3,4')118    assert 14 in res119    assert 17 in res120    assert 21 in res121    res = server.getAllImgsByKeywords(1, 30, 0, '3,4,5')122    assert 14 in res123    assert 17 in res124    assert 21 in res125    assert 22 in res126    res = server.getAllImgsByKeywords(1, 30, 1, '5')127    assert 22 in res128    res = server.getAllImgsByKeywords(1, 30, 1, '3,4')129    assert 14 in res130    start_test('query similarity')131    assert len(server.queryImgID(1,6, 3)) == 4132    start_test('query similarity by a keyword')133    #def queryImgIDKeywords(dbId, imgId, numres, kwJoinType, keywords):134    res = server.queryImgIDKeywords(1,6, 3,0,'3,4')135    resids = [r[0] for r in res]136    assert 17 in resids137   #  start_test('mostPopularKeywords')138   #139   #  assert server.addKeywordImg(1,50,1) == True140   #  assert server.addKeywordImg(1,50,2) == True141   #  assert server.addKeywordImg(1,50,3) == True142   #  assert server.addKeywordImg(1,51,1) == True143   #  assert server.addKeywordImg(1,51,2) == True144   #  assert server.addKeywordImg(1,51,3) == True145   #  assert server.addKeywordImg(1,52,3) == True146   #147   # # dbId, imgs, excludedKwds, count, mode148   #  res = server.mostPopularKeywords(1, '50,51,52', '1', 3, 0)149   #  resmap = {}150   #  for i in range(len(res)/2):151   #      resmap[res[i*2]] = res[i*2+1]...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
