Best Python code snippet using playwright-python
searcher.py
Source:searcher.py  
1# Copyright 1999-2021 Alibaba Group Holding Ltd.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#      http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import itertools15import logging16import os17import pickle  # nosec  # pylint: disable=import_pickle18import random19from hashlib import md520from collections import defaultdict21import numpy as np22from .... import opcodes23from .... import tensor as mt24from ....config import options25from ....core import ENTITY_TYPE, OutputType, recursive_tile26from ....core.context import get_context27from ....core.operand import OperandStage28from ....lib.filesystem import get_fs, FileSystem29from ....serialization.serializables import (30    KeyField,31    StringField,32    Int32Field,33    Int64Field,34    DictField,35    AnyField,36    BytesField,37    BoolField,38)39from ....tensor.core import TensorOrder40from ....utils import has_unknown_shape, Timer, ceildiv41from ...operands import LearnOperand, LearnOperandMixin42from ..core import proxima, validate_tensor, get_proxima_type43logger = logging.getLogger(__name__)44class ProximaSearcher(LearnOperand, LearnOperandMixin):45    _op_type_ = opcodes.PROXIMA_SIMPLE_SEARCHER46    _tensor = KeyField("tensor")47    _distance_metric = StringField("distance_metric")48    _dimension = Int32Field("dimension")49    _row_number = Int64Field("row_number")50    _topk = Int32Field("topk")51    _threads = Int32Field("threads")52    _index = AnyField("index")53    _index_searcher = StringField("index_searcher")54    _index_searcher_params = DictField("index_searcher_params")55    _index_reformer = StringField("index_reformer")56    _index_reformer_params = DictField("index_reformer_params")57    _download_index = BoolField("download_index")58    _storage_options = BytesField(59        "storage_options", on_serialize=pickle.dumps, on_deserialize=pickle.loads60    )61    def __init__(62        self,63        tensor=None,64        distance_metric=None,65        dimension=None,66        row_number=None,67        topk=None,68        index=None,69        threads=None,70        index_searcher=None,71        index_searcher_params=None,72        index_reformer=None,73        index_reformer_params=None,74        download_index=None,75        storage_options=None,76        output_types=None,77        stage=None,78        **kw,79    ):80        super().__init__(81            _tensor=tensor,82            _distance_metric=distance_metric,83            _row_number=row_number,84            _dimension=dimension,85            _index=index,86            _threads=threads,87            _index_searcher=index_searcher,88            _index_searcher_params=index_searcher_params,89            _index_reformer=index_reformer,90            _index_reformer_params=index_reformer_params,91            _download_index=download_index,92            _output_types=output_types,93            _topk=topk,94            _storage_options=storage_options,95            **kw,96        )97        if self._output_types is None:98            self._output_types = [OutputType.tensor, OutputType.tensor]99    @property100    def tensor(self):101        return self._tensor102    @property103    def distance_metric(self):104        return self._distance_metric105    @property106    def dimension(self):107        return self._dimension108    @property109    def row_number(self):110        return self._row_number111    @property112    def topk(self):113        return self._topk114    @property115    def threads(self):116        return self._threads117    @property118    def index(self):119        return self._index120    @property121    def index_searcher(self):122        return self._index_searcher123    @property124    def index_searcher_params(self):125        return self._index_searcher_params126    @property127    def index_reformer(self):128        return self._index_reformer129    @property130    def index_reformer_params(self):131        return self._index_reformer_params132    @property133    def download_index(self):134        return self._download_index135    @property136    def storage_options(self):137        return self._storage_options138    @property139    def output_limit(self):140        return 1 if self._download_index else 2141    def _set_inputs(self, inputs):142        super()._set_inputs(inputs)143        if self.stage != OperandStage.agg and not self._download_index:144            self._tensor = self._inputs[0]145            if isinstance(self._index, ENTITY_TYPE):146                self._index = self._inputs[-1]147    def __call__(self, tensor, index):148        kws = [149            {150                "dtype": np.dtype(np.uint64),151                "shape": (tensor.shape[0], self._topk),152                "order": TensorOrder.C_ORDER,153            },154            {155                "dtype": np.dtype(np.float32),156                "shape": (tensor.shape[0], self._topk),157                "order": TensorOrder.C_ORDER,158            },159        ]160        inputs = [tensor]161        if hasattr(index, "op"):162            inputs.append(index)163        return mt.ExecutableTuple(self.new_tileables(inputs, kws=kws))164    @classmethod165    def _build_download_chunks(cls, op, indexes):166        ctx = get_context()167        workers = ctx.get_worker_addresses() or [None]168        if len(workers) < len(indexes):169            workers = [workers[i % len(workers)] for i in range(len(indexes))]170        indexes_iter = iter(itertools.cycle(indexes))171        download_chunks = defaultdict(list)172        for i, worker in enumerate(workers):173            download_op = op.copy().reset_key()174            download_op.stage = OperandStage.map175            download_op.expect_worker = worker176            download_op._download_index = True177            download_op._tensor = None178            download_op._index = next(indexes_iter)179            download_chunks[i % len(indexes)].append(180                download_op.new_chunk(181                    None, index=(i,), shape=(), dtype=op.inputs[0].dtype182                )183            )184        return download_chunks185    @classmethod186    def tile(cls, op: "ProximaSearcher"):187        tensor = op.tensor188        index = op.index189        topk = op.topk190        outs = op.outputs191        row_number = op.row_number192        ctx = get_context()193        # make sure all inputs have known chunk sizes194        if has_unknown_shape(*op.inputs):195            yield196        rechunk_size = dict()197        if tensor.chunk_shape[1] > 1:198            rechunk_size[1] = tensor.shape[1]199        if row_number is not None:200            rechunk_size[0] = tensor.shape[0] // row_number201        if len(rechunk_size) > 0:202            tensor = yield from recursive_tile(tensor.rechunk(rechunk_size))203        logger.warning(f"query chunks count: {len(tensor.chunks)} ")204        if hasattr(index, "op"):205            built_indexes = [index.chunks] * len(tensor.chunks)206        else:207            # index path208            fs: FileSystem = get_fs(index, op.storage_options)209            index_paths = [210                f for f in fs.ls(index) if f.rsplit("/", 1)[-1].startswith("proxima_")211            ]212            download_chunks = cls._build_download_chunks(op, index_paths)213            iters = [iter(itertools.cycle(i)) for i in download_chunks.values()]214            built_indexes = []215            for _ in range(len(tensor.chunks)):216                built_indexes.append([next(it) for it in iters])217        if hasattr(index, "op"):218            index_chunks_workers = [219                m["bands"][0][0]220                for m in ctx.get_chunks_meta(221                    [c.key for c in index.chunks], fields=["bands"]222                )223            ]224        else:225            index_chunks_workers = [None] * len(built_indexes[0])226        out_chunks = [], []227        for i, tensor_chunk in enumerate(tensor.chunks):228            pk_chunks, distance_chunks = [], []229            for j, chunk_index, worker in zip(230                itertools.count(), built_indexes[i], index_chunks_workers231            ):232                chunk_op = op.copy().reset_key()233                chunk_op.stage = OperandStage.map234                if hasattr(index, "op"):235                    chunk_op.expect_worker = worker236                else:237                    chunk_op.expect_worker = chunk_index.op.expect_worker238                chunk_op._index = chunk_index239                chunk_op._tensor = None240                chunk_kws = [241                    {242                        "index": (tensor_chunk.index[0], j),243                        "dtype": outs[0].dtype,244                        "shape": (tensor_chunk.shape[0], topk),245                        "order": TensorOrder.C_ORDER,246                    },247                    {248                        "index": (tensor_chunk.index[0], j),249                        "dtype": outs[1].dtype,250                        "shape": (tensor_chunk.shape[0], topk),251                        "order": TensorOrder.C_ORDER,252                    },253                ]254                chunk_inputs = [tensor_chunk, chunk_index]255                pk_chunk, distance_chunk = chunk_op.new_chunks(256                    chunk_inputs, kws=chunk_kws257                )258                pk_chunks.append(pk_chunk)259                distance_chunks.append(distance_chunk)260            if len(pk_chunks) == 1:261                out_chunks[0].append(pk_chunks[0])262                out_chunks[1].append(distance_chunks[0])263                continue264            # combine topk results265            combine_size = options.combine_size266            tensor_out_chunks = [pk_chunks, distance_chunks]267            while True:268                chunk_size = ceildiv(len(tensor_out_chunks[0]), combine_size)269                cur_out_chunks = [[], []]270                for k in range(chunk_size):271                    to_combine_pks = tensor_out_chunks[0][272                        k * combine_size : (k + 1) * combine_size273                    ]274                    to_combine_distances = tensor_out_chunks[1][275                        k * combine_size : (k + 1) * combine_size276                    ]277                    chunk_op = op.copy().reset_key()278                    chunk_op.stage = OperandStage.agg279                    chunk_op._tensor = None280                    chunk_op._index = None281                    agg_chunk_kws = [282                        {283                            "index": (i, 0),284                            "dtype": outs[0].dtype,285                            "shape": (tensor_chunk.shape[0], topk),286                            "order": outs[0].order,287                        },288                        {289                            "index": (i, 0),290                            "dtype": outs[1].dtype,291                            "shape": (tensor_chunk.shape[0], topk),292                            "order": outs[1].order,293                        },294                    ]295                    pk_result_chunk, distance_result_chunk = chunk_op.new_chunks(296                        to_combine_pks + to_combine_distances, kws=agg_chunk_kws297                    )298                    cur_out_chunks[0].append(pk_result_chunk)299                    cur_out_chunks[1].append(distance_result_chunk)300                tensor_out_chunks = cur_out_chunks301                if len(tensor_out_chunks[0]) == 1:302                    break303            out_chunks[0].append(tensor_out_chunks[0][0])304            out_chunks[1].append(tensor_out_chunks[1][0])305        kws = []306        pk_params = outs[0].params307        pk_params["chunks"] = out_chunks[0]308        pk_params["nsplits"] = (tensor.nsplits[0], (topk,))309        kws.append(pk_params)310        distance_params = outs[1].params311        distance_params["chunks"] = out_chunks[1]312        distance_params["nsplits"] = (tensor.nsplits[0], (topk,))313        kws.append(distance_params)314        new_op = op.copy()315        return new_op.new_tileables(op.inputs, kws=kws)316    @classmethod317    def _execute_download(cls, ctx, op: "ProximaSearcher"):318        index_path = op.index319        with Timer() as timer:320            fs = get_fs(index_path, op.storage_options)321            # TODO322            dirs = os.environ.get("MARS_SPILL_DIRS")323            if dirs:324                temp_dir = random.choice(dirs.split(":"))325            else:326                temp_dir = "/tmp/proxima-index/"327            local_path = os.path.join(328                temp_dir, md5(str(index_path).encode("utf-8")).hexdigest()329            )  # noqa: B303  # nosec330            exist_state = True331            if not os.path.exists(local_path):332                exist_state = False333                if not os.path.exists(local_path.rsplit("/", 1)[0]):334                    os.mkdir(local_path.rsplit("/", 1)[0])335                with open(local_path, "wb") as out_f:336                    with fs.open(index_path, "rb") as in_f:337                        # 32M338                        chunk_bytes = 32 * 1024**2339                        while True:340                            data = in_f.read(chunk_bytes)341                            if data:342                                out_f.write(data)343                            else:344                                break345        logger.warning(346            f"ReadingFromVolume({op.key}), index path: {index_path}, "347            f"local_path {local_path}"348            f"size {os.path.getsize(local_path)}, "349            f"already exist {exist_state}, "350            f"costs {timer.duration} seconds "351            f"speed {round(os.path.getsize(local_path) / (1024 ** 2) / timer.duration, 2)} MB/s"352        )353        ctx[op.outputs[0].key] = local_path354    @classmethod355    def _execute_map(cls, ctx, op: "ProximaSearcher"):356        if op.download_index:357            cls._execute_download(ctx, op)358            return359        inp = ctx[op.tensor.key]360        index_path = ctx[op.inputs[-1].key]361        with Timer() as timer:362            flow = proxima.IndexFlow(363                container_name="MMapFileContainer",364                container_params={},365                searcher_name=op.index_searcher,366                searcher_params=op.index_searcher_params,367                measure_name="",368                measure_params={},369                reformer_name=op.index_reformer,370                reformer_params=op.index_reformer_params,371            )372            flow.load(index_path)373            vecs = np.ascontiguousarray(inp)374        logger.warning(375            f"LoadIndex({op.key})  index path: {index_path}  costs {timer.duration} seconds"376        )377        logger.warning(f"threads count:{op.threads}  vecs count:{len(vecs)}")378        with Timer() as timer:379            batch = 10000380            s_idx = 0381            e_idx = min(s_idx + batch, len(vecs))382            result_pks, result_distances = None, None383            while s_idx < len(vecs):384                with Timer() as timer_s:385                    tp = get_proxima_type(vecs.dtype)386                    result_pks_b, result_distances_b = proxima.IndexUtility.ann_search(387                        searcher=flow,388                        type=tp,389                        query=vecs[s_idx:e_idx],390                        topk=op.topk,391                        threads=op.threads,392                    )393                    if result_pks is None:394                        result_pks = np.asarray(result_pks_b)395                        result_distances = np.asarray(result_distances_b)396                    else:397                        result_pks = np.concatenate(398                            (result_pks, np.asarray(result_pks_b))399                        )400                        result_distances = np.concatenate(401                            (result_distances, np.asarray(result_distances_b))402                        )403                    s_idx = e_idx404                    e_idx = min(s_idx + batch, len(vecs))405                logger.warning(406                    f"Search({op.key}) count {s_idx}/{len(vecs)}:{round(s_idx * 100 / len(vecs), 2)}%"407                    f" costs {round(timer_s.duration, 2)} seconds"408                )409        logger.warning(f"Search({op.key}) costs {timer.duration} seconds")410        ctx[op.outputs[0].key] = np.asarray(result_pks)411        ctx[op.outputs[1].key] = np.asarray(result_distances)412    @classmethod413    def _execute_agg(cls, ctx, op: "ProximaSearcher"):414        inputs_data = [ctx[inp.key] for inp in op.inputs]415        chunk_num = len(inputs_data) // 2416        pks = np.concatenate(inputs_data[:chunk_num], axis=1)417        distances = np.concatenate(inputs_data[chunk_num:], axis=1)418        n_doc = len(pks)419        topk = op.topk420        # calculate topk on rows421        if op.distance_metric == "InnerProduct":422            inds = np.argsort(distances, axis=1)[:, -1 : -topk - 1 : -1]423        else:424            inds = np.argsort(distances, axis=1)[:, :topk]425        result_pks = np.empty((n_doc, topk), dtype=pks.dtype)426        result_distances = np.empty((n_doc, topk), dtype=distances.dtype)427        rng = np.arange(n_doc)428        for i in range(topk):429            ind = inds[:, i]430            result_pks[:, i] = pks[rng, ind]431            result_distances[:, i] = distances[rng, ind]432        del rng433        ctx[op.outputs[0].key] = result_pks434        ctx[op.outputs[1].key] = result_distances435    @classmethod436    def execute(cls, ctx, op: "ProximaSearcher"):437        if op.stage != OperandStage.agg:438            return cls._execute_map(ctx, op)439        else:440            return cls._execute_agg(ctx, op)441def search_index(442    tensor,443    topk,444    index,445    threads=4,446    row_number=None,447    dimension=None,448    distance_metric=None,449    index_searcher=None,450    index_searcher_params=None,451    index_reformer=None,452    index_reformer_params=None,453    storage_options=None,454    run=True,455    session=None,456    run_kwargs=None,457):458    tensor = validate_tensor(tensor)459    if dimension is None:460        dimension = tensor.shape[1]461    if index_searcher is None:462        index_searcher = ""463    if index_searcher_params is None:464        index_searcher_params = {}465    if index_reformer is None:466        index_reformer = ""467    if index_reformer_params is None:468        index_reformer_params = {}469    if distance_metric is None:470        distance_metric = ""471    if hasattr(index, "op") and index.op.index_path is not None:472        storage_options = storage_options or index.op.storage_options473        index = index.op.index_path474    op = ProximaSearcher(475        tensor=tensor,476        distance_metric=distance_metric,477        dimension=dimension,478        row_number=row_number,479        topk=topk,480        index=index,481        threads=threads,482        index_searcher=index_searcher,483        index_searcher_params=index_searcher_params,484        index_reformer=index_reformer,485        index_reformer_params=index_reformer_params,486        storage_options=storage_options,487    )488    result = op(tensor, index)489    if run:490        return result.execute(session=session, **(run_kwargs or dict()))491    else:...analyzer.py
Source:analyzer.py  
1# Copyright 1999-2021 Alibaba Group Holding Ltd.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#      http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import logging15from collections import deque, defaultdict16from typing import Dict, List, Tuple, Type, Union17from ....config import Config18from ....core import ChunkGraph, ChunkType, enter_mode19from ....core.operand import Fetch, VirtualOperand, LogicKeyGenerator20from ....resource import Resource21from ....typing import BandType22from ....utils import build_fetch, tokenize23from ...subtask import SubtaskGraph, Subtask24from ..core import Task, new_task_id25from .assigner import AbstractGraphAssigner, GraphAssigner26from .fusion import Coloring27logger = logging.getLogger(__name__)28class GraphAnalyzer:29    def __init__(30        self,31        chunk_graph: ChunkGraph,32        band_resource: Dict[BandType, Resource],33        task: Task,34        config: Config,35        graph_assigner_cls: Type[AbstractGraphAssigner] = None,36        stage_id: str = None,37    ):38        self._chunk_graph = chunk_graph39        self._band_resource = band_resource40        self._task = task41        self._stage_id = stage_id42        self._config = config43        self._fuse_enabled = task.fuse_enabled44        self._extra_config = task.extra_config45        if graph_assigner_cls is None:46            graph_assigner_cls = GraphAssigner47        self._graph_assigner_cls = graph_assigner_cls48        self._chunk_to_copied = dict()49        self._logic_key_generator = LogicKeyGenerator()50    @classmethod51    def _iter_start_ops(cls, chunk_graph: ChunkGraph):52        visited = set()53        op_keys = set()54        start_chunks = deque(chunk_graph.iter_indep())55        stack = deque([start_chunks.popleft()])56        while stack:57            chunk = stack.popleft()58            if chunk not in visited:59                inp_chunks = chunk_graph.predecessors(chunk)60                if not inp_chunks or all(61                    inp_chunk in visited for inp_chunk in inp_chunks62                ):63                    if len(inp_chunks) == 0:64                        op_key = chunk.op.key65                        if op_key not in op_keys:66                            op_keys.add(op_key)67                            yield chunk.op68                    visited.add(chunk)69                    stack.extend(c for c in chunk_graph[chunk] if c not in visited)70                else:71                    stack.appendleft(chunk)72                    stack.extendleft(73                        reversed(74                            [75                                c76                                for c in chunk_graph.predecessors(chunk)77                                if c not in visited78                            ]79                        )80                    )81            if not stack and start_chunks:82                stack.appendleft(start_chunks.popleft())83    @classmethod84    def _gen_input_chunks(85        cls,86        inp_chunks: List[ChunkType],87        chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],88    ) -> List[ChunkType]:89        # gen fetch chunks for input chunks90        inp_fetch_chunks = []91        for inp_chunk in inp_chunks:92            if inp_chunk in chunk_to_fetch_chunk:93                inp_fetch_chunks.append(chunk_to_fetch_chunk[inp_chunk])94            elif isinstance(inp_chunk.op, Fetch):95                chunk_to_fetch_chunk[inp_chunk] = inp_chunk96                inp_fetch_chunks.append(inp_chunk)97            else:98                fetch_chunk = build_fetch(inp_chunk).data99                chunk_to_fetch_chunk[inp_chunk] = fetch_chunk100                inp_fetch_chunks.append(fetch_chunk)101        return inp_fetch_chunks102    @staticmethod103    def _to_band(band_or_worker: Union[BandType, str]) -> BandType:104        if isinstance(band_or_worker, tuple) and len(band_or_worker) == 2:105            # band already106            return band_or_worker107        else:108            return band_or_worker, "numa-0"109    def _gen_subtask_info(110        self,111        chunks: List[ChunkType],112        chunk_to_subtask: Dict[ChunkType, Subtask],113        chunk_to_bands: Dict[ChunkType, BandType],114        chunk_to_fetch_chunk: Dict[ChunkType, ChunkType],115    ) -> Tuple[Subtask, List[Subtask]]:116        # gen subtask and its input subtasks117        final_result_chunks_set = set(self._chunk_graph.result_chunks)118        chunks_set = set(chunks)119        result_chunks = []120        result_chunks_set = set()121        chunk_graph = ChunkGraph(result_chunks)122        out_of_scope_chunks = []123        chunk_to_copied = self._chunk_to_copied124        # subtask properties125        band = None126        is_virtual = None127        retryable = True128        chunk_priority = None129        expect_worker = None130        bands_specified = None131        for chunk in chunks:132            if expect_worker is None:133                expect_worker = chunk.op.expect_worker134                bands_specified = expect_worker is not None135            else:  # pragma: no cover136                assert (137                    chunk.op.expect_worker is None138                    or expect_worker == chunk.op.expect_worker139                ), (140                    f"expect_worker {chunk.op.expect_worker} conflicts with chunks that have same color: "141                    f"{expect_worker}"142                )143            # process band144            chunk_band = chunk_to_bands.get(chunk)145            if chunk_band is not None:146                assert (147                    band is None or band == chunk_band148                ), "band conflicts with chunks that have same color"149                band = chunk_band150            # process is_virtual151            if isinstance(chunk.op, VirtualOperand):152                assert is_virtual is None, "only 1 virtual operand can exist"153                is_virtual = True154            else:155                is_virtual = False156            # process retryable157            if not chunk.op.retryable:158                retryable = False159            # process priority160            if chunk.op.priority is not None:161                assert (162                    chunk_priority is None or chunk_priority == chunk.op.priority163                ), "priority conflicts with chunks that have same color"164                chunk_priority = chunk.op.priority165            # process input chunks166            inp_chunks = []167            build_fetch_index_to_chunks = dict()168            for i, inp_chunk in enumerate(chunk.inputs):169                if inp_chunk in chunks_set:170                    inp_chunks.append(chunk_to_copied[inp_chunk])171                else:172                    build_fetch_index_to_chunks[i] = inp_chunk173                    inp_chunks.append(None)174                    if not isinstance(inp_chunk.op, Fetch):175                        out_of_scope_chunks.append(inp_chunk)176            fetch_chunks = self._gen_input_chunks(177                list(build_fetch_index_to_chunks.values()), chunk_to_fetch_chunk178            )179            for i, fetch_chunk in zip(build_fetch_index_to_chunks, fetch_chunks):180                inp_chunks[i] = fetch_chunk181            copied_op = chunk.op.copy()182            copied_op._key = chunk.op.key183            out_chunks = [184                c.data185                for c in copied_op.new_chunks(186                    inp_chunks, kws=[c.params.copy() for c in chunk.op.outputs]187                )188            ]189            for src_chunk, out_chunk in zip(chunk.op.outputs, out_chunks):190                out_chunk._key = src_chunk.key191                chunk_graph.add_node(out_chunk)192                chunk_to_copied[src_chunk] = out_chunk193                if chunk in final_result_chunks_set:194                    result_chunks.append(out_chunk)195                    result_chunks_set.add(out_chunk)196                if not is_virtual:197                    # skip adding fetch chunk to chunk graph when op is virtual operand198                    for c in inp_chunks:199                        if c not in chunk_graph:200                            chunk_graph.add_node(c)201                        chunk_graph.add_edge(c, out_chunk)202        # add chunks with no successors into result chunks203        result_chunks.extend(204            c205            for c in chunk_graph.iter_indep(reverse=True)206            if c not in result_chunks_set207        )208        expect_bands = (209            [self._to_band(expect_worker)]210            if bands_specified211            else ([band] if band is not None else None)212        )213        # calculate priority214        if out_of_scope_chunks:215            inp_subtasks = []216            for out_of_scope_chunk in out_of_scope_chunks:217                copied_out_of_scope_chunk = chunk_to_copied[out_of_scope_chunk]218                inp_subtask = chunk_to_subtask[out_of_scope_chunk]219                if (220                    copied_out_of_scope_chunk221                    not in inp_subtask.chunk_graph.result_chunks222                ):223                    # make sure the chunk that out of scope224                    # is in the input subtask's results,225                    # or the meta may be lost226                    inp_subtask.chunk_graph.result_chunks.append(227                        copied_out_of_scope_chunk228                    )229                inp_subtasks.append(inp_subtask)230            depth = max(st.priority[0] for st in inp_subtasks) + 1231        else:232            inp_subtasks = []233            depth = 0234        priority = (depth, chunk_priority or 0)235        subtask = Subtask(236            subtask_id=new_task_id(),237            stage_id=self._stage_id,238            logic_key=self._gen_logic_key(chunks),239            session_id=self._task.session_id,240            task_id=self._task.task_id,241            chunk_graph=chunk_graph,242            expect_bands=expect_bands,243            bands_specified=bands_specified,244            virtual=is_virtual,245            priority=priority,246            retryable=retryable,247            extra_config=self._extra_config,248        )249        return subtask, inp_subtasks250    def _gen_logic_key(self, chunks: List[ChunkType]):251        return tokenize(252            *[self._logic_key_generator.get_logic_key(chunk.op) for chunk in chunks]253        )254    @enter_mode(build=True)255    def gen_subtask_graph(self) -> SubtaskGraph:256        """257        Analyze chunk graph and generate subtask graph.258        Returns259        -------260        subtask_graph: SubtaskGraph261            Subtask graph.262        """263        reassign_worker_ops = [264            chunk.op for chunk in self._chunk_graph if chunk.op.reassign_worker265        ]266        start_ops = (267            list(self._iter_start_ops(self._chunk_graph))268            if len(self._chunk_graph) > 0269            else []270        )271        # assign start chunks272        to_assign_ops = start_ops + reassign_worker_ops273        assigner = self._graph_assigner_cls(274            self._chunk_graph, to_assign_ops, self._band_resource275        )276        # assign expect workers277        cur_assigns = {278            op.key: self._to_band(op.expect_worker)279            for op in start_ops280            if op.expect_worker is not None281        }282        logger.debug(283            "Start to assign %s start chunks for task %s",284            len(start_ops),285            self._task.task_id,286        )287        chunk_to_bands = assigner.assign(cur_assigns=cur_assigns)288        logger.debug(289            "Assigned %s start chunks for task %s", len(start_ops), self._task.task_id290        )291        # assign expect workers for those specified with `expect_worker`292        # skip `start_ops`, which have been assigned before293        for chunk in self._chunk_graph:294            if chunk not in start_ops and chunk.op.expect_worker is not None:295                chunk_to_bands[chunk] = self._to_band(chunk.op.expect_worker)296        # color nodes297        if self._fuse_enabled:298            logger.debug("Start to fuse chunks for task %s", self._task.task_id)299            # sort start chunks in coloring as start_ops300            op_key_to_chunks = defaultdict(list)301            for chunk in self._chunk_graph:302                op_key_to_chunks[chunk.op.key].append(chunk)303            init_chunk_to_bands = dict()304            for start_op in start_ops:305                for start_chunk in op_key_to_chunks[start_op.key]:306                    init_chunk_to_bands[start_chunk] = chunk_to_bands[start_chunk]307            coloring = Coloring(308                self._chunk_graph,309                list(self._band_resource),310                init_chunk_to_bands,311                initial_same_color_num=getattr(312                    self._config, "initial_same_color_num", None313                ),314                as_broadcaster_successor_num=getattr(315                    self._config, "as_broadcaster_successor_num", None316                ),317            )318            chunk_to_colors = coloring.color()319        else:320            # if not fuse enabled, color all chunks with different colors321            chunk_to_colors = {322                c: i for i, c in enumerate(self._chunk_graph.topological_iter())323            }324        color_to_chunks = defaultdict(list)325        for chunk, color in chunk_to_colors.items():326            color_to_chunks[color].append(chunk)327        # gen subtask graph328        subtask_graph = SubtaskGraph()329        chunk_to_fetch_chunk = dict()330        chunk_to_subtask = dict()331        # states332        visited = set()333        logic_key_to_subtasks = defaultdict(list)334        for chunk in self._chunk_graph.topological_iter():335            if chunk in visited:336                continue337            color = chunk_to_colors[chunk]338            same_color_chunks = color_to_chunks[color]339            if all(isinstance(c.op, Fetch) for c in same_color_chunks):340                # all fetch ops, no need to gen subtask341                continue342            subtask, inp_subtasks = self._gen_subtask_info(343                same_color_chunks,344                chunk_to_subtask,345                chunk_to_bands,346                chunk_to_fetch_chunk,347            )348            subtask_graph.add_node(subtask)349            logic_key_to_subtasks[subtask.logic_key].append(subtask)350            for inp_subtask in inp_subtasks:351                subtask_graph.add_edge(inp_subtask, subtask)352            for c in same_color_chunks:353                chunk_to_subtask[c] = subtask354            visited.update(same_color_chunks)355        for subtasks in logic_key_to_subtasks.values():356            for logic_index, subtask in enumerate(subtasks):357                subtask.logic_index = logic_index358                subtask.logic_parallelism = len(subtasks)...builder.py
Source:builder.py  
1# Copyright 1999-2021 Alibaba Group Holding Ltd.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7#      http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import itertools15import logging16import os17import pickle  # nosec  # pylint: disable=import_pickle18import tempfile19import uuid20import numpy as np21from .... import opcodes22from .... import tensor as mt23from ....lib.filesystem import get_fs24from ....core import OutputType25from ....core.context import get_context26from ....core.operand import OperandStage27from ....serialization.serializables import (28    StringField,29    Int32Field,30    Int64Field,31    DictField,32    BytesField,33    TupleField,34    DataTypeField,35)36from ....utils import has_unknown_shape, Timer37from ...operands import LearnOperand, LearnOperandMixin38from ..core import (39    proxima,40    get_proxima_type,41    validate_tensor,42    available_numpy_dtypes,43    rechunk_tensor,44    build_mmap_chunks,45)46logger = logging.getLogger(__name__)47DEFAULT_INDEX_SIZE = 5 * 10**648class ProximaBuilder(LearnOperand, LearnOperandMixin):49    _op_type_ = opcodes.PROXIMA_SIMPLE_BUILDER50    _distance_metric = StringField("distance_metric")51    _dimension = Int32Field("dimension")52    _column_number = Int64Field("column_number")53    _index_path = StringField("index_path")54    _index_builder = StringField("index_builder")55    _index_builder_params = DictField("index_builder_params")56    _index_converter = StringField("index_converter")57    _index_converter_params = DictField("index_converter_params")58    _topk = Int32Field("topk")59    _storage_options = BytesField(60        "storage_options", on_serialize=pickle.dumps, on_deserialize=pickle.loads61    )62    # only for chunk63    _array_shape = TupleField("array_shape")64    _array_dtype = DataTypeField("array_dtype")65    _offset = Int64Field("offset")66    def __init__(67        self,68        distance_metric=None,69        index_path=None,70        dimension=None,71        column_number=None,72        index_builder=None,73        index_builder_params=None,74        index_converter=None,75        index_converter_params=None,76        array_shape=None,77        array_dtype=None,78        offset=None,79        topk=None,80        storage_options=None,81        output_types=None,82        **kw,83    ):84        super().__init__(85            _distance_metric=distance_metric,86            _index_path=index_path,87            _dimension=dimension,88            _column_number=column_number,89            _index_builder=index_builder,90            _index_builder_params=index_builder_params,91            _array_shape=array_shape,92            _array_dtype=array_dtype,93            _offset=offset,94            _index_converter=index_converter,95            _index_converter_params=index_converter_params,96            _topk=topk,97            _storage_options=storage_options,98            _output_types=output_types,99            **kw,100        )101        if self._output_types is None:102            self._output_types = [OutputType.object]103    @property104    def distance_metric(self):105        return self._distance_metric106    @property107    def column_number(self):108        return self._column_number109    @property110    def index_path(self):111        return self._index_path112    @property113    def dimension(self):114        return self._dimension115    @property116    def index_builder(self):117        return self._index_builder118    @property119    def index_builder_params(self):120        return self._index_builder_params121    @property122    def index_converter(self):123        return self._index_converter124    @property125    def index_converter_params(self):126        return self._index_converter_params127    @property128    def topk(self):129        return self._topk130    @property131    def storage_options(self):132        return self._storage_options133    @property134    def array_shape(self):135        return self._array_shape136    @property137    def array_dtype(self):138        return self._array_dtype139    @property140    def offset(self):141        return self._offset142    def __call__(self, tensor):143        return self.new_tileable([tensor])144    @classmethod145    def _get_atleast_topk_nsplit(cls, nsplit, topk):146        new_nsplit = []147        i = 0148        while i < len(nsplit):149            cur = nsplit[i]150            i += 1151            if cur >= topk:152                new_nsplit.append(cur)153            else:154                while i < len(nsplit):155                    cur += nsplit[i]156                    i += 1157                    if cur >= topk:158                        break159                if cur < topk and len(new_nsplit) > 0:160                    new_nsplit[-1] += cur161                elif cur >= topk:162                    new_nsplit.append(cur)163        new_nsplit = tuple(new_nsplit)164        assert sum(new_nsplit) == sum(nsplit), (165            f"sum of nsplit not equal, " f"old: {nsplit}, new: {new_nsplit}"166        )167        return new_nsplit168    @classmethod169    def tile(cls, op):170        tensor = op.inputs[0]171        out = op.outputs[0]172        index_path = op.index_path173        ctx = get_context()174        fs = None175        if index_path is not None:176            fs = get_fs(index_path, op.storage_options)177        if index_path is not None:178            # check if the index path is empty179            try:180                files = [f for f in fs.ls(index_path) if "proxima_" in f]181                if files:182                    raise ValueError(183                        f"Directory {index_path} contains built proxima index, "184                        f"clean them to perform new index building"185                    )186            except FileNotFoundError:187                # if not exist, create directory188                fs.mkdir(index_path)189        # make sure all inputs have known chunk sizes190        if has_unknown_shape(*op.inputs):191            yield192        if op.column_number:193            index_chunk_size = op.inputs[0].shape[0] // op.column_number194        else:195            worker_num = len(ctx.get_worker_addresses() or [])196            if worker_num > 0:197                index_chunk_size = max(198                    op.inputs[0].shape[0] // worker_num, DEFAULT_INDEX_SIZE199                )200            else:201                index_chunk_size = DEFAULT_INDEX_SIZE202        if op.topk is not None:203            index_chunk_size = cls._get_atleast_topk_nsplit(index_chunk_size, op.topk)204        # build chunks for writing tensors to mmap files.205        worker_iter = iter(itertools.cycle(ctx.get_worker_addresses() or [None]))206        chunk_groups = rechunk_tensor(tensor, index_chunk_size)207        out_chunks = []208        offsets = []209        offset = 0210        for chunk_group in chunk_groups:211            offsets.append(offset)212            file_prefix = f"proxima-build-{str(uuid.uuid4())}"213            out_chunks.append(214                build_mmap_chunks(215                    chunk_group, next(worker_iter), file_prefix=file_prefix216                )217            )218            offset += sum(c.shape[0] for c in chunk_group)219        final_out_chunks = []220        for j, chunks in enumerate(out_chunks):221            chunk_op = op.copy().reset_key()222            chunk_op.stage = OperandStage.map223            chunk_op.expect_worker = chunks[0].op.expect_worker224            chunk_op._array_shape = chunks[0].op.total_shape225            chunk_op._array_dtype = chunks[0].dtype226            chunk_op._offset = offsets[j]227            out_chunk = chunk_op.new_chunk(chunks, index=(j,))228            final_out_chunks.append(out_chunk)229        logger.warning(f"index chunks count: {len(final_out_chunks)} ")230        params = out.params231        params["chunks"] = final_out_chunks232        params["nsplits"] = ((1,) * len(final_out_chunks),)233        new_op = op.copy()234        return new_op.new_tileables(op.inputs, kws=[params])235    @classmethod236    def _execute_map(cls, ctx, op: "ProximaBuilder"):237        mmap_path = ctx[op.inputs[0].key]238        out = op.outputs[0]239        data = np.memmap(240            mmap_path, dtype=op.array_dtype, mode="r", shape=op.array_shape241        )242        proxima_type = get_proxima_type(op.array_dtype)243        offset = op.offset244        # holder245        with Timer() as timer:246            holder = proxima.IndexHolder(247                type=proxima_type, dimension=op.dimension, shallow=True248            )249            holder.mount(data, key_base=offset)250        logger.warning(f"Holder({op.key}) costs {timer.duration} seconds")251        # converter252        meta = proxima.IndexMeta(253            proxima_type, dimension=op.dimension, measure_name=op.distance_metric254        )255        if op.index_converter is not None:256            with Timer() as timer:257                converter = proxima.IndexConverter(258                    name=op.index_converter, meta=meta, params=op.index_converter_params259                )260                converter.train_and_transform(holder)261                holder = converter.result()262                meta = converter.meta()263            logger.warning(f"Converter({op.key}) costs {timer.duration} seconds")264        # builder265        with Timer() as timer:266            builder = proxima.IndexBuilder(267                name=op.index_builder, meta=meta, params=op.index_builder_params268            )269            builder = builder.train_and_build(holder)270        logger.warning(f"Builder({op.key}) costs {timer.duration} seconds")271        # remove mmap file272        os.remove(mmap_path)273        # dumper274        with Timer() as timer:275            path = tempfile.mkstemp(prefix="proxima-", suffix=".index")[1]276            dumper = proxima.IndexDumper(name="FileDumper", path=path)277            builder.dump(dumper)278            dumper.close()279        logger.warning(f"Dumper({op.key}) costs {timer.duration} seconds")280        if op.index_path is None:281            ctx[out.key] = path282        else:283            # write to external file284            with Timer() as timer:285                fs = get_fs(op.index_path, op.storage_options)286                filename = f"proxima_{out.index[0]}_index"287                out_path = f'{op.index_path.rstrip("/")}/{filename}'288                def write_index():289                    with fs.open(out_path, "wb") as out_f:290                        with open(path, "rb") as in_f:291                            # 128M292                            chunk_bytes = 128 * 1024**2293                            while True:294                                data = in_f.read(chunk_bytes)295                                if data:296                                    out_f.write(data)297                                else:298                                    break299                # retry 3 times300                for _ in range(3):301                    try:302                        write_index()303                        break304                    except:  # noqa: E722  # nosec  # pylint: disable=bare-except305                        fs.delete(out_path)306                        continue307            logger.warning(308                f"WritingToVolume({op.key}), out path: {out_path}, "309                f"size {os.path.getsize(path)}, "310                f"costs {timer.duration} seconds "311                f"speed {round(os.path.getsize(path) / (1024 ** 2) / timer.duration, 2)} MB/s"312            )313            ctx[out.key] = filename314    @classmethod315    def _execute_agg(cls, ctx, op: "ProximaBuilder"):316        paths = [ctx[inp.key] for inp in op.inputs]317        ctx[op.outputs[0].key] = paths318    @classmethod319    def execute(cls, ctx, op: "ProximaBuilder"):320        if op.stage != OperandStage.agg:321            return cls._execute_map(ctx, op)322        else:323            return cls._execute_agg(ctx, op)324    @classmethod325    def concat_tileable_chunks(cls, tileable):326        assert not tileable.is_coarse()327        op = cls(stage=OperandStage.agg)328        chunk = cls(stage=OperandStage.agg).new_chunk(tileable.chunks)329        return op.new_tileable([tileable], chunks=[chunk], nsplits=((1,),))330def build_index(331    tensor,332    dimension=None,333    index_path=None,334    column_number=None,335    need_shuffle=False,336    distance_metric="SquaredEuclidean",337    index_builder="SsgBuilder",338    index_builder_params=None,339    index_converter=None,340    index_converter_params=None,341    topk=None,342    storage_options=None,343    run=True,344    session=None,345    run_kwargs=None,346):347    tensor = validate_tensor(tensor)348    if tensor.dtype not in available_numpy_dtypes:349        raise ValueError(350            f"Dtype to build index should be one of {available_numpy_dtypes}, "351            f"got {tensor.dtype}"352        )353    if dimension is None:354        dimension = tensor.shape[1]355    if index_builder_params is None:356        index_builder_params = {}357    if index_converter_params is None:358        index_converter_params = {}359    if need_shuffle:360        tensor = mt.random.permutation(tensor)361    op = ProximaBuilder(362        distance_metric=distance_metric,363        index_path=index_path,364        dimension=dimension,365        column_number=column_number,366        index_builder=index_builder,367        index_builder_params=index_builder_params,368        index_converter=index_converter,369        index_converter_params=index_converter_params,370        topk=topk,371        storage_options=storage_options,372    )373    result = op(tensor)374    if run:375        return result.execute(session=session, **(run_kwargs or dict()))376    else:...test_worker.py
Source:test_worker.py  
...15from asyncio.futures import Future16import pytest17from playwright.async_api import Error, Page, Worker18async def test_workers_page_workers(page, server):19    async with page.expect_worker() as worker_info:20        await page.goto(server.PREFIX + "/worker/worker.html")21    worker = await worker_info.value22    assert "worker.js" in worker.url23    assert (24        await worker.evaluate('() => self["workerFunction"]()')25        == "worker function result"26    )27    await page.goto(server.EMPTY_PAGE)28    assert len(page.workers) == 029async def test_workers_should_emit_created_and_destroyed_events(page: Page):30    worker_obj = None31    async with page.expect_event("worker") as event_info:32        worker_obj = await page.evaluate_handle(33            "() => new Worker(URL.createObjectURL(new Blob(['1'], {type: 'application/javascript'})))"34        )35    worker = await event_info.value36    worker_this_obj = await worker.evaluate_handle("() => this")37    worker_destroyed_promise: Future[Worker] = asyncio.Future()38    worker.once("close", lambda w: worker_destroyed_promise.set_result(w))39    await page.evaluate("workerObj => workerObj.terminate()", worker_obj)40    assert await worker_destroyed_promise == worker41    with pytest.raises(Error) as exc:42        await worker_this_obj.get_property("self")43    assert "Most likely the worker has been closed." in exc.value.message44async def test_workers_should_report_console_logs(page):45    async with page.expect_console_message() as message_info:46        await page.evaluate(47            '() => new Worker(URL.createObjectURL(new Blob(["console.log(1)"], {type: "application/javascript"})))'48        )49    message = await message_info.value50    assert message.text == "1"51@pytest.mark.skip_browser("firefox")  # TODO: investigate further @pavelfeldman52async def test_workers_should_have_JSHandles_for_console_logs(page):53    log_promise = asyncio.Future()54    page.on("console", lambda m: log_promise.set_result(m))55    await page.evaluate(56        "() => new Worker(URL.createObjectURL(new Blob(['console.log(1,2,3,this)'], {type: 'application/javascript'})))"57    )58    log = await log_promise59    assert log.text == "1 2 3 JSHandle@object"60    assert len(log.args) == 461    assert await (await log.args[3].get_property("origin")).json_value() == "null"62async def test_workers_should_evaluate(page):63    async with page.expect_event("worker") as event_info:64        await page.evaluate(65            "() => new Worker(URL.createObjectURL(new Blob(['console.log(1)'], {type: 'application/javascript'})))"66        )67    worker = await event_info.value68    assert await worker.evaluate("1+1") == 269async def test_workers_should_report_errors(page):70    error_promise = asyncio.Future()71    page.on("pageerror", lambda e: error_promise.set_result(e))72    await page.evaluate(73        """() => new Worker(URL.createObjectURL(new Blob([`74      setTimeout(() => {75        // Do a console.log just to check that we do not confuse it with an error.76        console.log('hey');77        throw new Error('this is my error');78      })79    `], {type: 'application/javascript'})))"""80    )81    error_log = await error_promise82    assert "this is my error" in error_log.message83@pytest.mark.skip_browser("firefox")  # TODO: fails upstream84async def test_workers_should_clear_upon_navigation(server, page):85    await page.goto(server.EMPTY_PAGE)86    async with page.expect_event("worker") as event_info:87        await page.evaluate(88            '() => new Worker(URL.createObjectURL(new Blob(["console.log(1)"], {type: "application/javascript"})))'89        )90    worker = await event_info.value91    assert len(page.workers) == 192    destroyed = []93    worker.once("close", lambda _: destroyed.append(True))94    await page.goto(server.PREFIX + "/one-style.html")95    assert destroyed == [True]96    assert len(page.workers) == 097@pytest.mark.skip_browser("firefox")  # TODO: fails upstream98async def test_workers_should_clear_upon_cross_process_navigation(server, page):99    await page.goto(server.EMPTY_PAGE)100    async with page.expect_event("worker") as event_info:101        await page.evaluate(102            "() => new Worker(URL.createObjectURL(new Blob(['console.log(1)'], {type: 'application/javascript'})))"103        )104    worker = await event_info.value105    assert len(page.workers) == 1106    destroyed = []107    worker.once("close", lambda _: destroyed.append(True))108    await page.goto(server.CROSS_PROCESS_PREFIX + "/empty.html")109    assert destroyed == [True]110    assert len(page.workers) == 0111async def test_workers_should_report_network_activity(page, server):112    async with page.expect_worker() as worker_info:113        await page.goto(server.PREFIX + "/worker/worker.html"),114    worker = await worker_info.value115    url = server.PREFIX + "/one-style.css"116    async with page.expect_request(url) as request_info, page.expect_response(117        url118    ) as response_info:119        await worker.evaluate(120            "url => fetch(url).then(response => response.text()).then(console.log)", url121        )122    request = await request_info.value123    response = await response_info.value124    assert request.url == url125    assert response.request == request126    assert response.ok127async def test_workers_should_report_network_activity_on_worker_creation(page, server):128    # Chromium needs waitForDebugger enabled for this one.129    await page.goto(server.EMPTY_PAGE)130    url = server.PREFIX + "/one-style.css"131    async with page.expect_request(url) as request_info, page.expect_response(132        url133    ) as response_info:134        await page.evaluate(135            """url => new Worker(URL.createObjectURL(new Blob([`136        fetch("${url}").then(response => response.text()).then(console.log);137        `], {type: 'application/javascript'})))""",138            url,139        )140    request = await request_info.value141    response = await response_info.value142    assert request.url == url143    assert response.request == request144    assert response.ok145async def test_workers_should_format_number_using_context_locale(browser, server):146    context = await browser.new_context(locale="ru-RU")147    page = await context.new_page()148    await page.goto(server.EMPTY_PAGE)149    async with page.expect_worker() as worker_info:150        await page.evaluate(151            "() => new Worker(URL.createObjectURL(new Blob(['console.log(1)'], {type: 'application/javascript'})))"152        )153    worker = await worker_info.value154    assert await worker.evaluate("() => (10000.20).toLocaleString()") == "10\u00A0000,2"...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
