Best Python code snippet using pandera_python
db_connection.py
Source:db_connection.py  
1import os2import json3from collections import defaultdict, OrderedDict4class BaseDBConnection(object):5    """ Base KG connection for database6    """7    def __init__(self, db_path, chunksize):8        """ Create an connection to database9        :param db_path: database path10        :type db_path: str11        :param chunksize: the chunksize to load/write database12        :type chunksize: int13        """14        self._conn = None15        self.chunksize = chunksize16    def close(self):17        """ Close the connection safely18        """19        raise NotImplementedError20    def __del__(self):21        self.close()22    def create_table(self, table_name, columns, column_types):23        """ Create a table with given columns and types24        :param table_name: the table name to create25        :type table_name: str26        :param columns: the columns to create27        :type columns: List[str]28        :param column_types: the corresponding column types29        :type column_types: List[str]30        """31        raise NotImplementedError32    def get_columns(self, table_name, columns):33        """ Get column information from a table34        :param table_name: the table name to retrieve35        :type table_name: str36        :param columns: the columns to retrieve37        :type columns: List[str]38        :return: a list of retrieved rows39        :rtype: List[Dict[str, object]]40        """41        raise NotImplementedError42    def select_row(self, table_name, _id, columns):43        """ Select a row from a table44        :param table_name: the table name to retrieve45        :type table_name: str46        :param _id: the row id47        :type _id: str48        :param columns: the columns to retrieve49        :type columns: List[str]50        :return: a retrieved row51        :rtype: Dict[str, object]52        """53        raise NotImplementedError54    def select_rows(self, table_name, _ids, columns):55        """ Select rows from a table56        :param table_name: the table name to retrieve57        :type table_name: str58        :param _ids: the row ids59        :type _ids: List[str]60        :param columns: the columns to retrieve61        :type columns: List[str]62        :return: retrieved rows63        :rtype: List[Dict[str, object]]64        """65        raise NotImplementedError66    def insert_row(self, table_name, row):67        """ Insert a row into a table68        :param table_name: the table name to insert69        :type table_name: str70        :param row: the row to insert71        :type row: Dict[str, object]72        """73        raise NotImplementedError74    def insert_rows(self, table_name, rows):75        """ Insert several rows into a table76        :param table_name: the table name to insert77        :type table_name: str78        :param rows: the rows to insert79        :type rows: List[Dict[str, object]]80        """81        raise NotImplementedError82    def get_update_op(self, update_columns, operator):83        """ Get an update operator based on columns and a operator84        :param update_columns: a list of columns to update85        :type update_columns: List[str]86        :param operator: an operator that applies to the columns, including "+", "-", "*", "/", "="87        :type operator: str88        :return: an operator that suits the backend database89        :rtype: object90        """91        raise NotImplementedError92    def update_row(self, table_name, row, update_op, update_columns):93        """ Update a row that exists in a table94        :param table_name: the table name to update95        :type table_name: str96        :param row: a new row97        :type row: Dict[str, object]98        :param update_op: an operator that returned by `get_update_op`99        :type update_op: object100        :param update_columns: the columns to update101        :type update_columns: List[str]102        """103        raise NotImplementedError104    def update_rows(self, table_name, rows, update_ops, update_columns):105        """ Update rows that exist in a table106        :param table_name: the table name to update107        :type table_name: str108        :param rows: new rows109        :type rows: List[Dict[str, object]]110        :param update_ops: operator(s) that returned by `get_update_op`111        :type update_ops: Union[List[object], object]112        :param update_columns: the columns to update113        :type update_columns: List[str]114        """115        raise NotImplementedError116    def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):117        """ Retrieve rows by specific keys in some order118        :param table_name: the table name to retrieve119        :type table_name: str120        :param bys: the given columns to match121        :type bys: List[str]122        :param keys: the given values to match123        :type keys: List[str]124        :param columns: the given columns to retrieve125        :type columns: List[str]126        :param order_bys: the columns whose value are used to sort rows127        :type order_bys: List[str]128        :param reverse: whether to sort in a reversed order129        :type reverse: bool130        :param top_n: how many rows to return, default `None` for all rows131        :type top_n: int132        :return: retrieved rows133        :rtype: List[Dict[str, object]]134        """135        raise NotImplementedError136class SqliteDBConnection(BaseDBConnection):137    """ KG connection for SQLite database138    """139    def __init__(self, db_path, chunksize):140        """ Create an connection to SQLite database141        :param db_path: database path, e.g., /home/xliucr/ASER/KG.db142        :type db_path: str143        :param chunksize: the chunksize to load/write database144        :type chunksize: int145        """146        import sqlite3147        super(SqliteDBConnection, self).__init__(db_path, chunksize)148        self._conn = sqlite3.connect(db_path)149    def close(self):150        """ Close the connection safely151        """152        if self._conn:153            self._conn.close()154    def create_table(self, table_name, columns, column_types):155        """ Create a table with given columns and types156        :param table_name: the table name to create157        :type table_name: str158        :param columns: the columns to create159        :type columns: List[str]160        :param column_types: the corresponding column types, please refer to https://www.sqlite.org/datatype3.html161        :type column_types: List[str]162        """163        create_table = "CREATE TABLE %s (%s);" % (164            table_name, ",".join([' '.join(x) for x in zip(columns, column_types)])165        )166        self._conn.execute(create_table)167        self._conn.commit()168    def get_columns(self, table_name, columns):169        """ Get column information from a table170        :param table_name: the table name to retrieve171        :type table_name: str172        :param columns: the columns to retrieve173        :type columns: List[str]174        :return: a list of retrieved rows175        :rtype: List[Dict[str, object]]176        """177        select_table = "SELECT %s FROM %s;" % (",".join(columns), table_name)178        result = list(map(lambda x: OrderedDict(zip(columns, x)), self._conn.execute(select_table)))179        return result180    def select_row(self, table_name, _id, columns):181        """ Select a row from a table182        (suggestion: consider to use `select_rows` if you want to retrieve multiple rows)183        :param table_name: the table name to retrieve184        :type table_name: str185        :param _id: the row id186        :type _id: str187        :param columns: the columns to retrieve188        :type columns: List[str]189        :return: a retrieved row190        :rtype: Dict[str, object]191        """192        select_table = "SELECT %s FROM %s WHERE _id=?;" % (",".join(columns), table_name)193        result = list(self._conn.execute(select_table, [_id]))194        if len(result) == 0:195            return None196        else:197            return OrderedDict(zip(columns, result[0]))198    def select_rows(self, table_name, _ids, columns):199        """ Select rows from a table200        :param table_name: the table name to retrieve201        :type table_name: str202        :param _ids: the row ids203        :type _ids: List[str]204        :param columns: the columns to retrieve205        :type columns: List[str]206        :return: retrieved rows207        :rtype: List[Dict[str, object]]208        """209        if len(_ids) > 0:210            row_cache = dict()211            result = []212            for idx in range(0, len(_ids), self.chunksize):213                select_table = "SELECT %s FROM %s WHERE _id IN ('%s');" % (214                    ",".join(columns), table_name, "','".join(_ids[idx:idx + self.chunksize])215                )216                result.extend(list(self._conn.execute(select_table)))217            for x in result:218                exact_match_row = OrderedDict(zip(columns, x))219                row_cache[exact_match_row["_id"]] = exact_match_row220            exact_match_rows = []221            for _id in _ids:222                exact_match_rows.append(row_cache.get(_id, None))223            return exact_match_rows224        else:225            return []226    def insert_row(self, table_name, row):227        """ Insert a row into a table228        (suggestion: consider to use `insert_rows` if you want to insert multiple rows)229        :param table_name: the table name to insert230        :type table_name: str231        :param row: the row to insert232        :type row: Dict[str, object]233        """234        insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(row))))235        self._conn.execute(insert_table, list(row.values()))236        self._conn.commit()237    def insert_rows(self, table_name, rows):238        """ Insert several rows into a table239        :param table_name: the table name to insert240        :type table_name: str241        :param rows: the rows to insert242        :type rows: List[Dict[str, object]]243        """244        if len(rows) > 0:245            insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(next(iter(rows))))))246            self._conn.executemany(insert_table, [list(row.values()) for row in rows])247            self._conn.commit()248    def get_update_op(self, update_columns, operator):249        """ Get an update operator based on columns and a operator250        :param update_columns: a list of columns to update251        :type update_columns: List[str]252        :param operator: an operator that applies to the columns, including "+", "-", "*", "/", "="253        :type operator: str254        :return: an operator that suits the backend database255        :rtype: str256        """257        if operator in "+-*/":258            update_ops = []259            for update_column in update_columns:260                update_ops.append(update_column + "=" + update_column + operator + "?")261            return ",".join(update_ops)262        elif operator == "=":263            update_ops = []264            for update_column in update_columns:265                update_ops.append(update_column + "=?")266            return ",".join(update_ops)267        else:268            raise NotImplementedError269    def _update_update_op(self, row, update_op, update_columns):270        update_op_sp = update_op.split('?')271        while len(update_op_sp) >= 0 and update_op_sp[-1] == '':272            update_op_sp.pop()273        assert len(update_op_sp) == len(update_columns)274        new_update_op = []275        for i in range(len(update_op_sp)):276            new_update_op.append(update_op_sp[i])277            if isinstance(row[update_columns[i]], str):278                new_update_op.append("'" + row[update_columns[i]].replace("'", "''") + "'")279            else:280                new_update_op.append(str(row[update_columns[i]]))281        return ''.join(new_update_op)282    def update_row(self, table_name, row, update_op, update_columns):283        """ Update a row that exists in a table284        (suggestion: consider to use `update_rows` if you want to update multiple rows)285        :param table_name: the table name to update286        :type table_name: str287        :param row: a new row288        :type row: Dict[str, object]289        :param update_op: an operator that returned by `get_update_op`290        :type update_op: str291        :param update_columns: the columns to update292        :type update_columns: List[str]293        """294        update_table = "UPDATE %s SET %s WHERE _id=?" % (table_name, update_op)295        self._conn.execute(update_table, [row[k] for k in update_columns] + [row["_id"]])296        self._conn.commit()297    def update_rows(self, table_name, rows, update_ops, update_columns):298        """ Update rows that exist in a table299        :param table_name: the table name to update300        :type table_name: str301        :param rows: new rows302        :type rows: List[Dict[str, object]]303        :param update_ops: operator(s) that returned by `get_update_op`304        :type update_ops: Union[List[str], str]305        :param update_columns: the columns to update306        :type update_columns: List[str]307        """308        if len(rows) > 0:309            if isinstance(update_ops, (tuple, list)):  # +-*/310                assert len(rows) == len(update_ops)311                # group rows by op to speed up312                update_op_collections = defaultdict(list)  # key: _update_update_op313                for i, row in enumerate(rows):314                    # self.update_row(row, table_name, update_ops[i], update_columns)315                    new_update_op = self._update_update_op(row, update_ops[i], update_columns)316                    update_op_collections[new_update_op].append(row)317                for new_update_op, op_rows in update_op_collections.items():318                    _ids = [row["_id"] for row in op_rows]319                    for idx in range(0, len(_ids), self.chunksize):320                        update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (321                            table_name, new_update_op, "','".join(_ids[idx:idx + self.chunksize])322                        )323                        self._conn.execute(update_table)324            else:  # =325                update_op = update_ops326                # group rows by new values to speed up327                value_collections = defaultdict(list)  # key: values of new values328                for row in rows:329                    # self.update_row(row, table_name, update_op, update_columns)330                    value_collections[json.dumps([row[k] for k in update_columns])].append(row)331                for new_update_op, op_rows in value_collections.items():332                    new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)333                    _ids = [row["_id"] for row in op_rows]334                    for idx in range(0, len(_ids), self.chunksize):335                        update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (336                            table_name, new_update_op, "','".join(_ids[idx:idx + self.chunksize])337                        )338                        self._conn.execute(update_table)339            self._conn.commit()340    def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):341        """ Retrieve rows by specific keys in some order342        :param table_name: the table name to retrieve343        :type table_name: str344        :param bys: the given columns to match345        :type bys: List[str]346        :param keys: the given values to match347        :type keys: List[str]348        :param columns: the given columns to retrieve349        :type columns: List[str]350        :param order_bys: the columns whose value are used to sort rows351        :type order_bys: List[str]352        :param reverse: whether to sort in a reversed order353        :type reverse: bool354        :param top_n: how many rows to return, default `None` for all rows355        :type top_n: int356        :return: retrieved rows357        :rtype: List[Dict[str, object]]358        """359        key_match_events = []360        select_table = "SELECT %s FROM %s WHERE %s" % (361            ",".join(columns), table_name, " AND ".join(["%s=?" % (by) for by in bys])362        )363        if order_bys:364            select_table += " ORDER BY %s %s" % (",".join(order_bys), "DESC" if reverse else "ASC")365        if top_n:366            select_table += " LIMIT %d" % (top_n)367        select_table += ";"368        for x in self._conn.execute(select_table, keys):369            key_match_event = OrderedDict(zip(columns, x))370            key_match_events.append(key_match_event)371        return key_match_events372class MongoDBConnection(BaseDBConnection):373    """ KG connection for MongoDB374    """375    def __init__(self, db_path, chunksize):376        """ Create an connection to SQLite database377        :param db_path: database path, e.g., mongodb://localhost:27017/ASER378        :type db_path: str379        :param chunksize: the chunksize to load/write database380        :type chunksize: int381        """382        import pymongo383        super(MongoDBConnection, self).__init__(db_path, chunksize)384        host_port, db_name = os.path.split(db_path)385        self._client = pymongo.MongoClient(host_port, document_class=OrderedDict)386        self._conn = self._client[db_name]387    def close(self):388        """ Close the connection safely389        """390        self._client.close()391    def create_table(self, table_name):392        """ Create a table without the necessary to provide column information393        :param table_name: the table name to create394        :type table_name: str395        """396        self._conn[table_name]397    def __get_projection(self, columns):398        projection = {"_id": 0}399        for k in columns:400            projection[k] = 1401        return projection402    def get_columns(self, table_name, columns):403        """ Get column information from a table404        :param table_name: the table name to retrieve405        :type table_name: str406        :param columns: the columns to retrieve407        :type columns: List[str]408        :return: a list of retrieved rows409        :rtype: List[Dict[str, object]]410        """411        projection = self.__get_projection(columns)412        results = list(self._conn[table_name].find({}, projection))413        return results414    def select_row(self, table_name, _id, columns):415        """ Select a row from a table416        (suggestion: consider to use `select_rows` if you want to retrieve multiple rows)417        :param table_name: the table name to retrieve418        :type table_name: str419        :param _id: the row id420        :type _id: str421        :param columns: the columns to retrieve422        :type columns: List[str]423        :return: a retrieved row424        :rtype: Dict[str, object]425        """426        projection = self.__get_projection(columns)427        return self._conn[table_name].find_one({"_id": _id}, projection)428    def select_rows(self, table_name, _ids, columns):429        """ Select rows from a table430        :param table_name: the table name to retrieve431        :type table_name: str432        :param _ids: the row ids433        :type _ids: List[str]434        :param columns: the columns to retrieve435        :type columns: List[str]436        :return: retrieved rows437        :rtype: List[Dict[str, object]]438        """439        table = self._conn[table_name]440        exact_match_rows = []441        projection = self.__get_projection(columns)442        for idx in range(0, len(_ids), self.chunksize):443            query = {"_id": {'$in': _ids[idx:idx + self.chunksize]}}444            exact_match_rows.extend(table.find(query, projection))445        row_cache = {x["_id"]: x for x in exact_match_rows}446        exact_match_rows = [row_cache.get(_id, None) for _id in _ids]447        return exact_match_rows448    def insert_row(self, table_name, row):449        """ Insert a row into a table450        (suggestion: consider to use `insert_rows` if you want to insert multiple rows)451        :param table_name: the table name to insert452        :type table_name: str453        :param row: the row to insert454        :type row: Dict[str, object]455        """456        self._conn[table_name].insert_one(row)457    def insert_rows(self, table_name, rows):458        """ Insert several rows into a table459        :param table_name: the table name to insert460        :type table_name: str461        :param rows: the rows to insert462        :type rows: List[Dict[str, object]]463        """464        self._conn[table_name].insert_many(rows)465    def get_update_op(self, update_columns, operator):466        """ Get an update operator based on columns and a operator467        :param update_columns: a list of columns to update468        :type update_columns: List[str]469        :param operator: an operator that applies to the columns, including "+", "-", "*", "/", "="470        :type operator: str471        :return: an operator that suits the backend database472        :rtype: Dict[str, Dict[str, float]]473        """474        if operator == "+":475            update_ops = {}476            for update_column in update_columns:477                update_ops[update_column] = 1  # placeholder478            return {"$inc": update_ops}479        elif operator == "-":480            update_ops = {}481            for update_column in update_columns:482                update_ops[update_column] = -1  # placeholder483            return {"$inc": update_ops}484        elif operator == "*":485            update_ops = {}486            for update_column in update_columns:487                update_ops[update_column] = 2  # placeholder488            return {"$mul": update_ops}489        elif operator == "/":490            update_ops = {}491            for update_column in update_columns:492                update_ops[update_column] = 0.5  # placeholder493            return {"$mul": update_ops}494        elif operator == "=":495            update_ops = {}496            for update_column in update_columns:497                update_ops[update_column] = 1  # placeholder498            return {"$set": update_ops}499        else:500            raise NotImplementedError501    def _update_update_op(self, row, update_op, update_columns):502        """ Update the operator for a single row503        :param row: a new row504        :type row: Dict[str, object]505        :param update_op: an operator that returned by `get_update_op`506        :type update_op: Dict[str, Dict[str, float]]507        :param update_columns: the columns to update508        :type update_columns: List[str]509        :return: Dict[str, Dict[str, float]]510        :rtype: Dict[str, Dict[str, float]]511        """512        new_update_op = update_op.copy()513        for k, v in new_update_op.items():514            if k == "$inc":515                for update_column in update_columns:516                    if v[update_column] == 1:517                        v[update_column] = row[update_column]518                    else:519                        v[update_column] = -row[update_column]520            elif k == "$mul":521                for update_column in update_columns:522                    if v[update_column] == 2:523                        v[update_column] = row[update_column]524                    else:525                        v[update_column] = 1.0 / row[update_column]526            elif k == "$set":527                for update_column in update_columns:528                    v[update_column] = row[update_column]529        return new_update_op530    def update_row(self, table_name, row, update_op, update_columns):531        """ Update a row that exists in a table532        (suggestion: consider to use `update_rows` if you want to update multiple rows)533        :param table_name: the table name to update534        :type table_name: str535        :param row: a new row536        :type row: Dict[str, object]537        :param update_op: an operator that returned by `get_update_op`538        :type update_op: Dict[str, Dict[str, float]]539        :param update_columns: the columns to update540        :type update_columns: List[str]541        """542        self._conn[table_name].update_one({"_id": row["_id"]}, self._update_update_op(row, update_op, update_columns))543    def update_rows(self, table_name, rows, update_ops, update_columns):544        """ Update rows that exist in a table545        :param table_name: the table name to update546        :type table_name: str547        :param rows: new rows548        :type rows: List[Dict[str, object]]549        :param update_ops: operator(s) that returned by `get_update_op`550        :type update_ops: Union[List[Dict[str, Dict[str, float]]], Dict[str, Dict[str, float]]]551        :param update_columns: the columns to update552        :type update_columns: List[str]553        """554        if len(rows) > 0:555            if isinstance(update_ops, (tuple, list)):  # +-*/556                assert len(rows) == len(update_ops)557                update_op_collections = defaultdict(list)558                for i, row in enumerate(rows):559                    # self.update_row(row, table_name, update_ops[i], update_columns)560                    new_update_op = self._update_update_op(row, update_ops[i], update_columns)561                    update_op_collections[json.dumps(new_update_op)].append(row)562                for new_update_op, op_rows in update_op_collections.items():563                    new_update_op = json.loads(new_update_op)564                    _ids = [row["_id"] for row in op_rows]565                    for idx in range(0, len(_ids), self.chunksize):566                        query = {"_id": {'$in': _ids[idx:idx + self.chunksize]}}567                        self._conn[table_name].update_many(query, new_update_op)568            else:  # =569                update_op = update_ops570                value_collections = defaultdict(list)571                for row in rows:572                    value_collections[json.dumps([row[k] for k in update_columns])].append(row)573                for new_update_op, op_rows in value_collections.items():574                    new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)575                    _ids = [row["_id"] for row in op_rows]576                    for idx in range(0, len(_ids), self.chunksize):577                        query = {"_id": {'$in': _ids[idx:idx + self.chunksize]}}578                        self._conn[table_name].update_many(query, new_update_op)579    def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):580        """ Retrieve rows by specific keys in some order581        :param table_name: the table name to retrieve582        :type table_name: str583        :param bys: the given columns to match584        :type bys: List[str]585        :param keys: the given values to match586        :type keys: List[str]587        :param columns: the given columns to retrieve588        :type columns: List[str]589        :param order_bys: the columns whose value are used to sort rows590        :type order_bys: List[str]591        :param reverse: whether to sort in a reversed order592        :type reverse: bool593        :param top_n: how many rows to return, default `None` for all rows594        :type top_n: int595        :return: retrieved rows596        :rtype: List[Dict[str, object]]597        """598        query = OrderedDict(zip(bys, keys))599        projection = self.__get_projection(columns)600        cursor = self._conn[table_name].find(query, projection)601        if order_bys:602            direction = -1 if reverse else 1603            cursor = cursor.sort([(k, direction) for k in order_bys])604        if top_n:605            result = []606            for x in cursor:607                result.append(x)608                if len(result) >= top_n:609                    break610            return result611        else:...base.py
Source:base.py  
1try:2    import ujson as json3except:4    import json5from collections import defaultdict, OrderedDict6class BaseConnection(object):7    def __init__(self, db_path, chunksize):8        self._conn = None9        self.chunksize = chunksize10    def close(self):11        if self._conn:12            self._conn.close()13    def __del__(self):14        self.close()15    def create_table(self, table_name, columns):16        raise NotImplementedError17    def get_columns(self, table_name, columns):18        raise NotImplementedError19    def select_row(self, table_name, _id, columns):20        raise NotImplementedError21    def select_rows(self, table_name, _ids, columns):22        raise NotImplementedError23    def insert_row(self, table_name, row):24        raise NotImplementedError25    def insert_rows(self, table_name, rows):26        raise NotImplementedError27    def update_row(self, table_name, row, update_op, update_columns):28        raise NotImplementedError29    def update_rows(self, table_name, rows, update_ops, update_columns):30        raise NotImplementedError31    def get_update_op(self, update_columns, operator):32        raise NotImplementedError33    def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):34        raise NotImplementedError35class SqliteConnection(BaseConnection):36    def __init__(self, db_path, chunksize):37        import sqlite338        super(SqliteConnection, self).__init__(db_path, chunksize)39        self._conn = sqlite3.connect(db_path)40    def create_table(self, table_name, columns, columns_types):41        create_table = "CREATE TABLE %s (%s);" % (table_name, ",".join(42            [' '.join(x) for x in zip(columns, columns_types)]))43        self._conn.execute(create_table)44        self._conn.commit()45    def get_columns(self, table_name, columns):46        select_table = "SELECT %s FROM %s;" % (",".join(columns), table_name)47        result = list(map(lambda x: OrderedDict(zip(columns, x)), self._conn.execute(select_table)))48        return result49    def select_row(self, table_name, _id, columns):50        select_table = "SELECT %s FROM %s WHERE _id=?;" % (",".join(columns), table_name)51        result = list(self._conn.execute(select_table, [_id]))52        if len(result) == 0:53            return None54        else:55            return OrderedDict(zip(columns, result[0]))56    def select_rows(self, table_name, _ids, columns):57        if len(_ids) > 0:58            row_cache = dict()59            result = []60            for idx in range(0, len(_ids), self.chunksize):61                select_table = "SELECT %s FROM %s WHERE _id IN ('%s');" % (62                    ",".join(columns), table_name, "','".join(_ids[idx:idx+self.chunksize]))63                result.extend(list(self._conn.execute(select_table)))64            for x in result:65                exact_match_row = OrderedDict(zip(columns, x))66                row_cache[exact_match_row["_id"]] = exact_match_row67            exact_match_rows = []68            for _id in _ids:69                exact_match_rows.append(row_cache.get(_id, None))70            return exact_match_rows71        else:72            return []73    def insert_row(self, table_name, row):74        insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(row))))75        self._conn.execute(insert_table, list(row.values()))76        self._conn.commit()77    def insert_rows(self, table_name, rows):78        if len(rows) > 0:79            insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(next(iter(rows))))))80            self._conn.executemany(insert_table, [list(row.values()) for row in rows])81            self._conn.commit()82    def _update_update_op(self, row, update_op, update_columns):83        update_op_sp = update_op.split('?')84        while len(update_op_sp) >= 0 and update_op_sp[-1] == '':85            update_op_sp.pop()86        assert len(update_op_sp) == len(update_columns)87        new_update_op = []88        for i in range(len(update_op_sp)):89            new_update_op.append(update_op_sp[i])90            if isinstance(row[update_columns[i]], str):91                new_update_op.append("'" + row[update_columns[i]].replace("'", "''") + "'")92            else:93                new_update_op.append(str(row[update_columns[i]]))94        return ''.join(new_update_op)95    def update_row(self, table_name, row, update_op, update_columns):96        update_table = "UPDATE %s SET %s WHERE _id=?" % (table_name, update_op)97        self._conn.execute(update_table, [row[k] for k in update_columns] + [row["_id"]])98        self._conn.commit()99    def update_rows(self, table_name, rows, update_ops, update_columns):100        if len(rows) > 0:101            if isinstance(update_ops, (tuple, list)): # +-*/102                assert len(rows) == len(update_ops)103                # group rows by op to speed up104                update_op_collections = defaultdict(list)  # key: _update_update_op105                for i, row in enumerate(rows):106                    # self.update_row(row, table_name, update_ops[i], update_columns)107                    new_update_op = self._update_update_op(row, update_ops[i], update_columns)108                    update_op_collections[new_update_op].append(row)109                for new_update_op, op_rows in update_op_collections.items():110                    _ids = [row["_id"] for row in op_rows]111                    for idx in range(0, len(_ids), self.chunksize):112                        update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (113                            table_name, new_update_op, "','".join(_ids[idx:idx+self.chunksize]))114                        self._conn.execute(update_table)115            else: # =116                update_op = update_ops117                # group rows by new values to speed up118                value_collections = defaultdict(list) # key: values of new values119                for row in rows:120                    # self.update_row(row, table_name, update_op, update_columns)121                    value_collections[json.dumps([row[k] for k in update_columns])].append(row)122                for new_update_op, op_rows in value_collections.items():123                    new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)124                    _ids = [row["_id"] for row in op_rows]125                    for idx in range(0, len(_ids), self.chunksize):126                        update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (127                            table_name, new_update_op, "','".join(_ids[idx:idx+self.chunksize]))128                        self._conn.execute(update_table)129            self._conn.commit()130            """131            if isinstance(update_ops, list) or isinstance(update_ops, tuple):132                assert len(rows) == len(update_ops)133                for i, row in enumerate(rows):134                    self.update_row(row, table_name, update_ops[i], update_columns)135            else:136                update_op = update_ops137                update_table = "UPDATE %s SET %s WHERE _id=?" % (138                    table_name, update_op)139                self._conn.executemany(140                    update_table, [[row[k] for k in update_columns] + [row["_id"]] for row in rows])141            self._conn.commit()142            """143    def get_update_op(self, update_columns, operator):144        if operator in "+-*/":145            update_ops = []146            for update_column in update_columns:147                update_ops.append(update_column + "=" + update_column + operator + "?")148            return ",".join(update_ops)149        elif operator == "=":150            update_ops = []151            for update_column in update_columns:152                update_ops.append(update_column + "=?")153            return ",".join(update_ops)154        else:155            raise NotImplementedError156    def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):157        key_match_events = []158        select_table = "SELECT %s FROM %s WHERE %s" % (159            ",".join(columns), table_name, " AND ".join(["%s=?" % (by) for by in bys]))160        if order_bys:161            select_table += " ORDER BY %s %s" % (",".join(order_bys), "DESC" if reverse else "ASC")162        if top_n:163            select_table += " LIMIT %d" % (top_n)164        select_table += ";"165        for x in self._conn.execute(select_table, keys):166            key_match_event = OrderedDict(zip(columns, x))167            key_match_events.append(key_match_event)168        return key_match_events169class MongoDBConnection(BaseConnection):170    def __init__(self, db_path, chunksize):171        import pymongo172        super(MongoDBConnection, self).__init__(db_path, chunksize)173        self._client = pymongo.MongoClient("mongodb://localhost:27017/", document_class=OrderedDict)174        self._conn = self._client[os.path.splitext(os.path.basename(db_path))[0]]175    def close(self):176        self._client.close()177    def create_table(self, table_name, columns, columns_types):178        self._conn[table_name]179    def __get_projection(self, columns):180        projection = {"_id": 0}181        for k in columns:182            projection[k] = 1183        return projection184    def get_columns(self, table_name, columns):185        projection = self.__get_projection(columns)186        results = list(self._conn[table_name].find({}, projection))187        return results188    def select_row(self, table_name, _id, columns):189        projection = self.__get_projection(columns)190        return self._conn[table_name].find_one({"_id": _id}, projection)191    def select_rows(self, table_name, _ids, columns):192        table = self._conn[table_name]193        exact_match_rows = []194        projection = self.__get_projection(columns)195        for idx in range(0, len(_ids), self.chunksize):196            query = {"_id": {'$in': _ids[idx:idx+self.chunksize]}}197            exact_match_rows.extend(table.find(query, projection))198        row_cache = {x["_id"]: x for x in exact_match_rows}199        exact_match_rows = [row_cache.get(_id, None) for _id in _ids]200        return exact_match_rows201    def insert_row(self, table_name, row):202        self._conn[table_name].insert_one(row)203    def insert_rows(self, table_name, rows):204        self._conn[table_name].insert_many(rows)205    def _update_update_op(self, row, update_op, update_columns):206        new_update_op = update_op.copy()207        for k, v in new_update_op.items():208            if k == "$inc":209                for update_column in update_columns:210                    if v[update_column] == 1:211                        v[update_column] = row[update_column]212                    else:213                        v[update_column] = -row[update_column]214            elif k == "$mul":215                for update_column in update_columns:216                    if v[update_column] == 2:217                        v[update_column] = row[update_column]218                    else:219                        v[update_column] = 1.0 / row[update_column]220            elif k == "$set":221                for update_column in update_columns:222                    v[update_column] = row[update_column]223        return new_update_op224    def update_row(self, table_name, row, update_op, update_columns):225        self._conn[table_name].update_one(226            {"_id": row["_id"]}, self._update_update_op(row, update_op, update_columns))227    def update_rows(self, table_name, rows, update_ops, update_columns):228        if len(rows) > 0:229            if isinstance(update_ops, (tuple, list)): # +-*/230                assert len(rows) == len(update_ops)231                update_op_collections = defaultdict(list)232                for i, row in enumerate(rows):233                    # self.update_row(row, table_name, update_ops[i], update_columns)234                    new_update_op = self._update_update_op(row, update_ops[i], update_columns)235                    update_op_collections[json.dumps(new_update_op)].append(row)236                for new_update_op, op_rows in update_op_collections.items():237                    new_update_op = json.loads(new_update_op)238                    _ids = [row["_id"] for row in op_rows]239                    for idx in range(0, len(_ids), self.chunksize):240                        query = {"_id": {'$in': _ids[idx:idx+self.chunksize]}}241                        self._conn[table_name].update_many(query, new_update_op)242            else: # =243                update_op = update_ops244                value_collections = defaultdict(list)245                for row in rows:246                    # self.update_row(row, table_name, update_op, update_columns)247                    value_collections[json.dumps([row[k] for k in update_columns])].append(row)248                for new_update_op, op_rows in value_collections.items():249                    new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)250                    _ids = [row["_id"] for row in op_rows]251                    for idx in range(0, len(_ids), self.chunksize):252                        query = {"_id": {'$in': _ids[idx:idx+self.chunksize]}}253                        self._conn[table_name].update_many(query, new_update_op)254    def get_update_op(self, update_columns, operator):255        if operator == "+":256            update_ops = {}257            for update_column in update_columns:258                update_ops[update_column] = 1  # placeholder259            return {"$inc": update_ops}260        elif operator == "-":261            update_ops = {}262            for update_column in update_columns:263                update_ops[update_column] = -1  # placeholder264            return {"$inc": update_ops}265        elif operator == "*":266            update_ops = {}267            for update_column in update_columns:268                update_ops[update_column] = 2  # placeholder269            return {"$mul": update_ops}270        elif operator == "/":271            update_ops = {}272            for update_column in update_columns:273                update_ops[update_column] = 0.5  # placeholder274            return {"$mul": update_ops}275        elif operator == "=":276            update_ops = {}277            for update_column in update_columns:278                update_ops[update_column] = 1  # placeholder279            return {"$set": update_ops}280        else:281            raise NotImplementedError282    def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):283        query = OrderedDict(zip(bys, keys))284        projection = self.__get_projection(columns)285        cursor = self._conn[table_name].find(query, projection)286        if order_bys:287            direction = -1 if reverse else 1288            cursor = cursor.sort([(k, direction) for k in order_bys])289        if top_n:290            result = []291            for x in cursor:292                result.append(x)293                if len(result) >= top_n:294                    break295            return result296        else:...dnn_ncrt_liew_combined_latest.py
Source:dnn_ncrt_liew_combined_latest.py  
...39    predicted = list(np.round(predicted))40    predicted = list(map(lambda row: list(map(int, row)), predicted))41    return predicted, classifier42def get_fingerprints(mol):43    morgan = update_columns(morgan_fingerprints(mol), 'morgan')44    maccs = update_columns(maccs_fingerprints(mol), 'maccs')45    charge_f, e = charge_features(mol)46    autocorrelation_f, e = autocorrelation_features(mol)47    harmonic_topology = pd.DataFrame(harmonic_topology_index_feature(mol))48    constitutional_f = constitutional_features(mol)49    estate_f = estate_features(mol)50    moe_f = moe_features(mol)51    bcut_f, e = bcut_features(mol)52    molproperty_f, e = molproperty_features(mol)53    cats2d_f = cats2d_features(mol)54    kappa = kappa_descriptors(mol)55    return {56        "maccs": maccs,57        "morgan": morgan,58        'charge': update_columns(charge_f, 'charge'),59        'harmonic_topology': update_columns(harmonic_topology, 'harmonic'),60        'autocorrelation': update_columns(autocorrelation_f, 'autocorrelation'),61        'constitutional': update_columns(constitutional_f, 'constitutional'),62        'estate': update_columns(estate_f, 'estate'),63        'moe': update_columns(moe_f, 'moe'),64        'bcut': update_columns(bcut_f, 'bcut'),65        'molproperty': update_columns(molproperty_f, 'molproperty'),66        'cats2d': update_columns(cats2d_f, 'cats2d'),67        'kappa': update_columns(kappa, 'kappa'),68    }69if __name__ == "__main__":70    ncrt_train = pd.read_csv('./data/raw/ncrt_liew_train.csv', index_col=0).fillna(0).reset_index(drop=True)71    ncrt_test = pd.read_csv('./data/raw/ncrt_liew_test.csv', index_col=0).fillna(0).reset_index(drop=True)72    common_comb = ['maccs', 'morgan', 'charge', 'harmonic_topology', 'autocorrelation', 'constitutional', 'estate',73                   'moe', 'bcut', 'molproperty', 'cats2d', 'kappa']74    fingerprints = get_fingerprints(convert_to_mol(ncrt_train['smiles']))75    fingerprints_test = get_fingerprints(convert_to_mol(ncrt_test['smiles']))76    all_inputs = pd.concat(list(map(lambda x: fingerprints[x], common_comb)), axis=1).fillna(0)77    all_inputs_test = pd.concat(list(map(lambda x: fingerprints_test[x], common_comb)), axis=1).fillna(0)78    X = pd.concat([all_inputs, all_inputs_test]).reset_index(drop=True)79    y = pd.concat([ncrt_train['label'], ncrt_test['label']]).reset_index(drop=True)80    # Instantiate the cross validator81    skf = StratifiedKFold(n_splits=10, random_state=1)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
