Best Python code snippet using slash
fetcher.py
Source:fetcher.py  
1# /usr/bin/env python32"""3ç½ç»æ¥è¯¢æ¥å£ï¼41. ä¸ªè¡æ¥è¯¢5    - QA_fetch_get_individual_financial: æ¥è¯¢ä¸ªè¡æå®æ¶é´æ®µæå®è´¢å¡æ¥è¡¨æå®æ¥åç±»åæ°æ®62. æªé¢æ¥è¯¢7    - QA_fetch_get_crosssection_financial: æ¥è¯¢æå®æ¥åææå®æ¥è¡¨æå®æ¥åç±»åæ°æ®8æ¬å°æ¥è¯¢æ¥å£ï¼91. æªé¢æ¥è¯¢10    - QA_fetch_crosssection_financial112. é«çº§æ¥è¯¢12    - QA_fetch_financial_adv13"""14import datetime15import time16from typing import List, Tuple, Union17import pandas as pd18import pymongo19import tushare as ts20from QUANTAXIS.QAFactor.utils import QA_fmt_code, QA_fmt_code_list21from QUANTAXIS.QAFetch.QAQuery_Advance import QA_fetch_stock_list22from QUANTAXIS.QAFetch.QATushare import get_pro23from QUANTAXIS.QAUtil import (DATABASE, QASETTING, QA_util_date_int2str,24                              QA_util_date_stamp, QA_util_get_pre_trade_date,25                              QA_util_get_real_date, QA_util_log_info,26                              QA_util_to_json_from_pandas)27REPORT_DATE_TAILS = ["0331", "0630", "0930", "1231"]28SHEET_TYPE = ["income", "balancesheet", "cashflow"]29REPORT_TYPE = ['1', '2', '3', '4', '5', '11']30def QA_fetch_get_individual_financial(31        code: str,32        start: Union[str, datetime.datetime, pd.Timestamp] = None,33        end: Union[str, datetime.datetime, pd.Timestamp] = None,34        report_date: Union[str, datetime.datetime] = None,35        sheet_type: str = "income",36        report_type: Union[int, str] = 1,37        fields: Union[str, Tuple, List] = None,38        wait_seconds: int = 61,39        max_trial: int = 3) -> pd.DataFrame:40    """个è¡è´¢å¡æ¥è¡¨ç½ç»æ¥è¯¢æ¥å£ï¼æ³¨æï¼è¿éç start ä¸ end æ¯é对 report_date è¿è¡èå´æ¥è¯¢41    Args:42        code (str): è¡ç¥¨ä»£ç 43        start (Union[str, datetime.datetime, pd.Timestamp], optional): æ¥è¯¢èµ·å§æ¶é´ï¼é»è®¤ä¸º None44        end (Union[str, datetime.datetime, pd.Timestamp], optional): æ¥è¯¢ç»ææ¶é´ï¼é»è®¤ä¸º None45        report_date (Union[str, datetime.datetime], optional): æ¥åæ. é»è®¤ä¸º Noneï¼å¦æä½¿ç¨äº report_date, å start ä¸ end åæ°ä¸åèµ·ä½ç¨46        sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income" ç±»å47            (婿¶¦è¡¨ "income"|48            èµäº§è´åºè¡¨ "balancesheet"|49            ç°éæµé表 "cashflow"|50            ä¸ç»©é¢å "forecast"|51            ä¸ç»©å¿«æ¥ "express")52        report_type (Union[int, str], optional): æ¥åç±»å. é»è®¤ä¸º 1ã53            (1	åå¹¶æ¥è¡¨	ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|54            2	åå£åå¹¶	åä¸å£åº¦çåå¹¶æ¥è¡¨ |55            3	è°æ´åå£å并表	è°æ´åçåå£åå¹¶æ¥è¡¨ï¼å¦ææï¼ |56            4	è°æ´åå¹¶æ¥è¡¨	æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæä¸ºä¸å¹´åº¦ |57            5	è°æ´ååå¹¶æ¥è¡¨	æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |58            6	æ¯å
¬å¸æ¥è¡¨	该å
¬å¸æ¯å
¬å¸çè´¢å¡æ¥è¡¨æ°æ® |59            7	æ¯å
¬å¸åå£è¡¨	æ¯å
¬å¸çåå£åº¦è¡¨ |60            8	æ¯å
¬å¸è°æ´åå£è¡¨	æ¯å
¬å¸è°æ´åçåå£è¡¨ |61            9	æ¯å
¬å¸è°æ´è¡¨	该å
¬å¸æ¯å
¬å¸çæ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ® |62            10 æ¯å
¬å¸è°æ´åæ¥è¡¨	æ¯å
¬å¸è°æ´ä¹åçåå§è´¢å¡æ¥è¡¨æ°æ® |63            11 è°æ´ååå¹¶æ¥è¡¨	è°æ´ä¹ååå¹¶æ¥è¡¨åæ°æ® |64            12 æ¯å
¬å¸è°æ´åæ¥è¡¨	æ¯å
¬å¸æ¥è¡¨åçåæ´åä¿ççåæ°æ®)65        fields (Union[str, Tuple, List], optional): æå®æ°æ®èå´ï¼å¦æè®¾ç½®ä¸º Noneï¼åè¿åæææ°æ®. é»è®¤ä¸º None.66        wait_seconds (int, optional): çå¾
éè¯æ¶é´. é»è®¤ä¸º 61 ç§.67        max_trial (int, optional): æå¤§éè¯æ¬¡æ°. é»è®¤ä¸º 3.68    Returns:69        pd.DataFrame: è¿åæå®ä¸ªè¡æ¶é´èå´å
æå®ç±»åçæ¥è¡¨æ°æ®70    """71    def _get_individual_financial(code, report_date, report_type, sheet_type, fields, wait_seconds, trial_count):72        nonlocal pro, max_trial73        if trial_count >= max_trial:74            raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")75        try:76            if not fields:77                df = eval(78                    f"pro.{sheet_type}(ts_code='{code}', period='{report_date}', report_type={report_type})")79            else:80                df = eval(81                    f"pro.{sheet_type}(ts_code='{code}', period='{report_date}', report_type={report_type}, fields={fields})")82            return df.rename(columns={"ts_code": "code", "end_date": "report_date"})83        except Exception as e:84            print(e)85            time.sleep(wait_seconds)86            _get_individual_financial(87                code, report_date, report_type, sheet_type, fields, wait_seconds, trial_count+1)88    pro = get_pro()89    report_type = int(report_type)90    if (not start) and (not end) and (not report_date):91        raise ValueError(92            "[QRY_DATES ERROR]\tparam 'start', 'end' and 'report_date' should not be none at the same time!")93    if isinstance(fields, str):94        fields = sorted(list(set([fields, "ts_code", "end_date",95                                  "ann_date", "f_ann_date", "report_type", "update_flag"])))96    if report_date:97        report_date = pd.Timestamp(report_date)98        year = report_date.year99        report_date_lists = [100            pd.Timestamp(str(year) + report_date_tail) for report_date_tail in REPORT_DATE_TAILS]101        if report_date not in report_date_lists:102            raise ValueError("[REPORT_DATE ERROR]")103        if sheet_type not in ["income", "balancesheet", "cashflow", "forecast", "express"]:104            raise ValueError("[SHEET_TYPE ERROR]")105        if report_type not in range(1, 13):106            raise ValueError("[REPORT_TYPE ERROR]")107        report_dates = [report_date]108    else:109        start = pd.Timestamp(start)110        start_year = start.year111        end = pd.Timestamp(end)112        end_year = end.year113        origin_year_ranges = pd.date_range(114            str(start_year), str(end_year+1), freq='Y').map(str).str.slice(0, 4).tolist()115        origin_report_ranges = pd.Series([116            pd.Timestamp(year + report_date_tail) for year in origin_year_ranges for report_date_tail in REPORT_DATE_TAILS])117        report_dates = origin_report_ranges.loc[(118            origin_report_ranges >= start) & (origin_report_ranges <= end)]119    df = pd.DataFrame()120    for report_date in report_dates:121        df = df.append(_get_individual_financial(122            code=QA_fmt_code(code, "ts"),123            report_date=report_date.strftime("%Y%m%d"),124            report_type=report_type,125            sheet_type=sheet_type,126            fields=fields,127            wait_seconds=wait_seconds,128            trial_count=0))129    df.code = QA_fmt_code_list(df.code)130    return df.reset_index(drop=True)131def QA_fetch_get_crosssection_financial(132        report_date: Union[str, datetime.datetime, pd.Timestamp],133        report_type: Union[int, str] = 1,134        sheet_type: str = "income",135        fields: Union[str, Tuple, List] = None,136        wait_seconds: int = 61,137        max_trial: int = 3) -> pd.DataFrame:138    """æªé¢è´¢å¡æ¥è¡¨ç½ç»æ¥è¯¢æ¥å£139    Args:140        report_date (Union[str, datetime.datetime, pd.Timestamp]): æ¥åæ141        report_type (Union[int, str], optional): æ¥åç±»åï¼é»è®¤å¼ä¸º 1.142            (1	åå¹¶æ¥è¡¨	ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|143             2	åå£åå¹¶	åä¸å£åº¦çåå¹¶æ¥è¡¨ |144             3	è°æ´åå£å并表	è°æ´åçåå£åå¹¶æ¥è¡¨ï¼å¦ææï¼ |145             4	è°æ´åå¹¶æ¥è¡¨	æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæä¸ºä¸å¹´åº¦ |146             5	è°æ´ååå¹¶æ¥è¡¨	æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |147             6	æ¯å
¬å¸æ¥è¡¨	该å
¬å¸æ¯å
¬å¸çè´¢å¡æ¥è¡¨æ°æ® |148             7	æ¯å
¬å¸åå£è¡¨	æ¯å
¬å¸çåå£åº¦è¡¨ |149             8	æ¯å
¬å¸è°æ´åå£è¡¨	æ¯å
¬å¸è°æ´åçåå£è¡¨ |150             9	æ¯å
¬å¸è°æ´è¡¨	该å
¬å¸æ¯å
¬å¸çæ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ® |151             10 æ¯å
¬å¸è°æ´åæ¥è¡¨	æ¯å
¬å¸è°æ´ä¹åçåå§è´¢å¡æ¥è¡¨æ°æ® |152             11 è°æ´ååå¹¶æ¥è¡¨	è°æ´ä¹ååå¹¶æ¥è¡¨åæ°æ® |153             12 æ¯å
¬å¸è°æ´åæ¥è¡¨	æ¯å
¬å¸æ¥è¡¨åçåæ´åä¿ççåæ°æ®)154        sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".155            (婿¶¦è¡¨ "income"|156             èµäº§è´åºè¡¨ "balancesheet"|157             ç°éæµé表 "cashflow"|158             ä¸ç»©é¢å "forecast"|159             ä¸ç»©å¿«æ¥ "express")160        fields (Union[str, List], optional): æ°æ®èå´ï¼é»è®¤ä¸º Noneï¼è¿åæææ°æ®.161        wait_seconds (int, optional): æ¥è¯¢è¶
æ¶æ¶é´, é»è®¤ä¸º 61.162        max_trial (int, optional): æ¥è¯¢æå¤§å°è¯æ¬¡æ°, é»è®¤ä¸º 3.163    Returns:164        pd.DataFrame: æå®æ¥åæçæå®è´¢å¡æ¥è¡¨æ°æ®165    """166    def _get_crosssection_financial(report_date, report_type, sheet_type, fields, wait_seconds, trial_count):167        nonlocal pro, max_trial168        if trial_count >= max_trial:169            raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")170        try:171            if not fields:172                print(173                    f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type})")174                df = eval(175                    f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type})")176            else:177                df = eval(178                    f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type}, fields={fields})")179            if df.empty:180                return df181            df.ts_code = QA_fmt_code_list(df.ts_code)182            return df.rename(columns={"ts_code": "code", "end_date": "report_date"}).sort_values(by=['ann_date', 'f_ann_date'])183        except Exception as e:184            print(e)185            time.sleep(wait_seconds)186            _get_crosssection_financial(187                report_date, report_type, sheet_type, fields, wait_seconds, trial_count + 1)188    # Tushare è´¦å·é
ç½®189    pro = get_pro()190    # 设置æ åæ¥åææ ¼å¼191    report_date = pd.Timestamp(report_date)192    report_type = int(report_type)193    year = report_date.year194    std_report_dates = [195        str(year) + report_date_tail for report_date_tail in REPORT_DATE_TAILS]196    # Tushare æ¥å£æ¯æçæ¥ææ ¼å¼197    if report_date.strftime("%Y%m%d") not in std_report_dates:198        raise ValueError("[REPORT_DATE ERROR]")199    # fields æ ¼å¼åå¤ç200    if isinstance(fields, str):201        fields = sorted(list(set([fields, "ts_code", "end_date",202                                  "ann_date", "f_ann_date", "report_type", "update_flag"])))203    # ç®åæ¯æå©æ¶¦è¡¨ï¼èµäº§è´åºè¡¨åç°éæµé表204    if sheet_type not in SHEET_TYPE:205        raise ValueError("[SHEET_TYPE ERROR]")206    if report_type not in range(1, 13):207        raise ValueError("[REPORT_TYTPE ERROR]")208    return _get_crosssection_financial(209        report_date=report_date.strftime("%Y%m%d"),210        report_type=report_type,211        sheet_type=sheet_type,212        fields=fields,213        wait_seconds=wait_seconds,214        trial_count=0)215# FIXME: Add Fetch Get Method of Daily Basic216def QA_fetch_get_daily_basic(217        code: Union[str, List, Tuple] = None,218        trade_date: Union[str, pd.Timestamp, datetime.datetime] = None,219        fields: Union[str, List, Tuple] = None,220        wait_seconds: int = 61,221        max_trial: int = 3222) -> pd.DataFrame:223    """224    ä»ç½ç»è·åå¸åºæå®äº¤ææ¥éè¦åºæ¬é¢ææ ï¼ç¨äºéè¡åæåæ¥è¡¨å±ç¤º225    Args:226        code(Union[str, List, Tuple], optional): æå®è¡ç¥¨ä»£ç ï¼é»è®¤ä¸º Noneï¼å³å¯¹åºäº¤ææ¥çå
¨å¸åºè¡ç¥¨227        trade_date(Union[str, pd.Timestamp, datetime.datetime], optional): æå®äº¤ææ¥æ, é»è®¤ä¸º None, å³è·ç¦»å½å228            æ¥ææè¿çäº¤ææ¥229        fields(Union[str, List, Tuple], optional): é»è®¤ä¸º Noneï¼å¦ææå®ä¸ºæä¸å个 strï¼é»è®¤è¿å DataFrame å
æ¬230            äº¤ææ¥çéå ä¿¡æ¯231        wait_seconds (int, optional): æ¥è¯¢è¶
æ¶æ¶é´, é»è®¤ä¸º 61.232        max_trial (int, optional): æ¥è¯¢æå¤§å°è¯æ¬¡æ°, é»è®¤ä¸º 3.233    Returns:234        pd.DataFrame: æå®äº¤ææ¥æå®è崿宿 ççæ¯æ¥åºæ¬é¢ææ ä¿¡æ¯235    """236    def _fetch_get_daily_basic(trade_date, fields, trial_count):237        nonlocal pro, max_trial238        try:239            if trial_count >= max_trial:240                raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")241            if not trade_date:242                trade_date = QA_util_get_pre_trade_date(243                    datetime.date.today(), 1).replace("-", "")244            else:245                trade_date = pd.Timestamp(trade_date).strftime("%Y%m%d")246            if not fields:247                qry = f"pro.daily_basic(trade_date={trade_date})"248            else:249                if isinstance(fields, str):250                    fields = list(set([fields] + ["ts_code", "trade_date"]))251                fields = ",".join(fields)252                qry = "pro.daily_basic(trade_date={trade_date}, fields={fields})"253            df = eval(qry)254            if df is None:255                raise ValueError("[ERROR]")256            return df257        except:258            time.sleep(61)259            _fetch_get_daily_basic(260                trade_date, fields, trial_count+1261            )262    pro = get_pro()263    df = _fetch_get_daily_basic(264        trade_date=trade_date, fields=fields, trial_count=0)265    if df.empty:266        return df267    else:268        df = df.rename(columns={"ts_code": "code"})269        df.code = QA_fmt_code_list(df.code)270        df = df.set_index("code")271    if not code:272        return df273    if isinstance(code, str):274        code = (code,)275    # exclude code which not in rtn dataframe276    filter_idx = df.index.intersection(code)277    return df.loc[filter_idx]278def QA_fetch_crosssection_financial(279        report_date: Union[str, datetime.datetime, pd.Timestamp],280        report_type: Union[int, str] = 1,281        sheet_type: str = "income",282        fields: Union[str, Tuple, List] = None) -> pd.DataFrame:283    """æ¬å°æ¥è¯¢æªé¢è´¢å¡æ°æ®æ¥å£284    Args:285        report_date (Union[str, datetime.datetime, pd.Timestamp]): æ¥åæ286        report_type (Union[int, str], optional): æ¥åç±»åï¼é»è®¤ä¸º 1.287            (1	åå¹¶æ¥è¡¨	ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|288             2	åå£åå¹¶	åä¸å£åº¦çåå¹¶æ¥è¡¨ |289             3	è°æ´åå£å并表	è°æ´åçåå£åå¹¶æ¥è¡¨ï¼å¦ææï¼ |290             4	è°æ´åå¹¶æ¥è¡¨	æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæä¸ºä¸å¹´åº¦ |291             5	è°æ´ååå¹¶æ¥è¡¨	æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |292             11 è°æ´ååå¹¶æ¥è¡¨	è°æ´ä¹ååå¹¶æ¥è¡¨åæ°æ®)293        sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".294        fields (Union[str, Tuple, List], optional): åæ®µï¼é»è®¤ä¸º Noneï¼è¿åææå段.295    Returns:296        pd.DataFrame: æå®æ¥åææå®æ¥è¡¨æ°æ®297    """298    if isinstance(fields, str):299        fields = sorted(list(set([fields, "code", "report_date",300                                  "ann_date", "f_ann_date", "report_type", "update_flag"])))301    coll = eval(f"DATABASE.{sheet_type}")302    report_date = pd.Timestamp(report_date).strftime("%Y%m%d")303    cursor = coll.find(304        {305            "report_date": report_date,306            "report_type": str(report_type)307        }308    )309    res = pd.DataFrame([item for item in cursor])310    if res.empty:311        return pd.DataFrame()312    res.report_date = pd.to_datetime(res.report_date, utc=False)313    if not fields:314        return res.drop(columns="_id")315    return res.drop(columns="_id")[fields]316def QA_fetch_financial_adv(317        code: Union[str, Tuple, List] = None,318        start: Union[str, datetime.datetime, pd.Timestamp] = None,319        end: Union[str, datetime.datetime, pd.Timestamp] = None,320        report_date: Union[str, datetime.datetime, pd.Timestamp] = None,321        report_type: Union[int, str] = None,322        sheet_type: str = "income",323        fields: Union[str, Tuple, List] = None) -> pd.DataFrame:324    """æ¬å°è·åæå®è¡ç¥¨æè
æå®è¡ç¥¨åè¡¨ï¼æå®æ¶é´èå´æè
æ¥åæï¼æå®æ¥åç±»åçæå®è´¢å¡æ¥è¡¨æ°æ®325    Args:326        code (Union[str, Tuple, List], optional): æå®è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º None, å
¨å¸åºè¡ç¥¨327        start (Union[str, datetime.datetime, pd.Timestamp], optional): èµ·å§æ¶é´328        end (Union[str, datetime.datetime, pd.Timestamp], optional): ç»ææ¶é´329        report_date (Union[str, datetime.datetime, pd.Timestamp], optional): æ¥åæ330        report_type (Union[int, str], optional): æ¥åç±»åï¼é»è®¤ä¸º 1.331            (1	åå¹¶æ¥è¡¨	ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|332             2	åå£åå¹¶	åä¸å£åº¦çåå¹¶æ¥è¡¨ |333             3	è°æ´åå£å并表	è°æ´åçåå£åå¹¶æ¥è¡¨ï¼å¦ææï¼ |334             4	è°æ´åå¹¶æ¥è¡¨	æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæä¸ºä¸å¹´åº¦ |335             5	è°æ´ååå¹¶æ¥è¡¨	æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |336             11 è°æ´ååå¹¶æ¥è¡¨	è°æ´ä¹ååå¹¶æ¥è¡¨åæ°æ®)337        sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".338        fields (List, optional): åæ®µï¼é»è®¤ä¸º Noneï¼è¿åææå段.339    Returns:340        pd.DataFrame: æå®æ¡ä»¶çæ¬å°æ¥è¡¨æ°æ®341    """342    if (not start) and (not end) and (not report_date):343        raise ValueError(344            "[DATE ERROR]\t 'start', 'end' ä¸ 'report_date' ä¸è½åæ¶ä¸º None")345    if isinstance(code, str):346        code = (code,)347    if not report_type:348        report_type = ("1", "2", "4", "5", "11")349    if isinstance(report_type, int) or isinstance(report_type, str):350        report_type = (str(report_type), )351    else:352        report_type = list(map(str, report_type))353    coll = eval(f"DATABASE.{sheet_type}")354    qry = {}355    if not report_date:356        if not end:357            end = datetime.date.today()358        start = pd.Timestamp(start)359        end = pd.Timestamp(end)360        start_date_stamp = QA_util_date_stamp(start)361        end_date_stamp = QA_util_date_stamp(end)362        if not code:363            qry = {364                "f_ann_date_stamp": {365                    "$gte": start_date_stamp,366                    "$lte": end_date_stamp367                },368                "report_type": {369                    "$in": report_type370                }371            }372        else:373            qry = {374                "code": {375                    "$in": code376                },377                "f_ann_date_stamp": {378                    "$gte": start_date_stamp,379                    "$lte": end_date_stamp380                },381                "report_type": {382                    "$in": report_type383                }384            }385    else:386        report_date_stamp = QA_util_date_stamp(report_date)387        if not code:388            qry = {389                "report_date_stamp": report_date_stamp,390                "report_type": {391                    "$in": report_type392                }393            }394        else:395            qry = {396                "code": {397                    "$in": code398                },399                "report_date_stamp": report_date_stamp,400                "report_type": {401                    "$in": report_type402                }403            }404    if isinstance(fields, str):405        fields = list(406            set([fields, "code", "ann_date", "report_date", "f_ann_date"]))407    elif fields:408        fields = list(409            set(list(fields) + ["code", "ann_date", "report_date", "f_ann_date"]))410    cursor = coll.find(qry, batch_size=10000).sort([411        ("report_date_stamp", pymongo.ASCENDING),412        ("f_ann_date_stamp", pymongo.ASCENDING)])413    if fields:414        df = pd.DataFrame(cursor).drop(columns="_id")[fields].set_index("code")415        df.report_date = pd.to_datetime(df.report_date, utc=False)416        df.ann_date = pd.to_datetime(df.ann_date, utc=False)417        df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)418    else:419        df = pd.DataFrame(cursor).drop(columns="_id").set_index("code")420        df.report_date = pd.to_datetime(df.report_date, utc=False)421        df.ann_date = pd.to_datetime(df.ann_date, utc=False)422        df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)423    return df424def QA_fetch_last_financial(425        code: Union[str, List, Tuple] = None,426        cursor_date: Union[str, datetime.datetime, pd.Timestamp] = None,427        report_label: Union[int, str] = None,428        report_type: Union[int, str, List, Tuple] = None,429        sheet_type: str = "income",430        fields: Union[str, List, Tuple] = None) -> pd.DataFrame:431    """è·åè·ç¦»æå®æ¥æ (cursor_date) æè¿çåå§æ°æ® (ä¸å
å«å¨ cursor_date åå¸çè´¢å¡æ°æ®)ï¼432       å½åæ¶è¾å
¥ cursor_date ä¸ report_date æ¶ï¼ä»¥ report_date ä½ä¸ºæ¥è¯¢æ å433       注æï¼434           è¿éç report_type ä»
æ¯æ (1,4, 5) ä¸ç§ç±»åï¼ä»¥é¿å
æ··æ·åå¹¶æ°æ®åå壿°æ®ç435       说æï¼436           æ³å·¥ (000528) å¨ 2018 å¹´ 8 æ 30 æ¥åå¸åå¹´æ¥ï¼ä¹åå¨ 2018 å¹´ 9 æ 29 æ¥åå¸ä¿®æ£æ¥åï¼437           - 妿è¾å
¥ç cursor_date 为 2018-08-31, é£ä¹è·åå°çå°±æ¯åå§åå¹´æ¥ï¼å¯¹åº report_type == 5438           - 妿è¾å
¥ç cursor_date 为 2018-09-30ï¼é£ä¹è·åå°çå°±æ¯ææ°åå¹¶æ¥è¡¨ï¼å¯¹åº report_type == 1439           - 妿坹åºç cursor_date 为 2019-08-31ï¼éè¦è·å 2018 å¹´åå¹´æ¥ï¼é£ä¹å°±è¿åæ³å·¥å¨ 2019 å¹´ 8 æ 29 æ¥åå¸çä¸å¹´åæåºåï¼å¯¹åº report_type == 4440    Args:441        code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æè¡ç¥¨å表ï¼é»è®¤ä¸º None, æ¥è¯¢ææè¡ç¥¨442        cursor_date (Union[str, datetime.datetime, pd.Timestamp]): æ¥è¯¢æªé¢æ¥æ (ä¸è¬æè°ä»æ¥), é»è®¤ä¸º None443        report_label (Union[str, int], optional): æå®æ¥è¡¨ç±»åï¼è¿éçç±»åå类为ä¸å£æ¥ï¼åå¹´æ¥ï¼ä¸å£æ¥ï¼å¹´æ¥, é»è®¤ä¸º Noneï¼å³éæ©è·ç¦» cursor_date æè¿çæ¥è¡¨ç±»å444        report_type (Union[str, List, Tuple], optional): [description]. æ¥è¡¨ç±»åï¼é»è®¤ä¸º None. å³è·ç¦» cursor_date æè¿çè´¢æ¥ï¼ä¸æå®ç±»åï¼é¿å
å¼å
¥æªæ¥æ°æ®445            (1	åå¹¶æ¥è¡¨	ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|446             2  åå£åå¹¶æ¥è¡¨447             4	è°æ´åå¹¶æ¥è¡¨	æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæä¸ºä¸å¹´åº¦ |448             5	è°æ´ååå¹¶æ¥è¡¨	æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ®)449        sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".450        fields (Union[str, List, Tuple], optional): åæ®µ, é»è®¤ä¸º None, è¿åææå段451    Returns:452        pd.DataFrame: å¤åæ¡ä»¶çè´¢å¡æ°æ®453    """454    def _trans_financial_type(x):455        if x.empty:456            return x457        if sheet_type == "balancesheet":458            # èµäº§è´åºè¡¨å±äºæ¶ç¹ä¿¡æ¯ï¼ç´æ¥è¿å459            return x460        else:461            if x.iloc[0].report_date[4:] in ['0331', '1231']:462                # ä¸å£æ¥èè¨ï¼åå£å并䏿®éå并没æåºå«ï¼ç´æ¥è¿å463                # å¹´æ¥èè¨ï¼ä¸åå¨å壿¦å¿µ464                return x.iloc[0]465            if x.iloc[0].report_type in ['1', '4', '5']:466                return x.iloc[0]467            if x.iloc[0].report_type == '2':468                # å°è¯æ¥æ¾å䏿¥åææ¥åç±»å为 '1' æ '4' çæ¥è¡¨æ°æ®469                # try:470                #     if (x.shape[0] > 1) & (x.iloc[1].report_date == x.iloc[0].report_date) & (x.iloc[1].report_type in ['1', '4']):471                #         return x.iloc[1]472                # except:473                #     return pd.Series()474                # å°è¯ç´æ¥å©ç¨å壿°æ®è¿è¡æ¼æ¥475                cursor_x = x.loc[x.report_date.map(str).str.slice(476                    0, 4) == x.iloc[0].report_date[:4]]477                cursor_x = cursor_x.drop_duplicates(subset = ['report_date'], keep='first')478                cursor_x = cursor_x.loc[cursor_x.report_date <=479                                        x.iloc[0].report_date]480                cursor_x = cursor_x.fillna(0)481                non_numeric_columns = sorted(["f_ann_date", "f_ann_date_stamp", "ann_date", "ann_date_stamp", "report_date", "report_date_stamp",482                    "update_flag", "report_type", "code", "report_label"])483                columns = sorted(list(set(cursor_x.columns) - set(non_numeric_columns)))484                rtn_se = cursor_x[columns].sum(axis=0)485                rtn_se = rtn_se.append(cursor_x[non_numeric_columns].iloc[0])486                return rtn_se487    if isinstance(code, str):488        code = (code,)489    if not report_type:490        report_type = ["1", "2", "4", "5"]491    else:492        if isinstance(report_type, int):493            report_type = str(report_type)494        if isinstance(report_type, str):495            if report_type not in ["1", "4", "5"]:496                raise ValueError("[REPORT_TYPE ERROR]")497            report_type = (report_type,)498        else:499            report_type = list(set(report_type) & set('1', '2', '4', '5'))500    if sheet_type not in SHEET_TYPE:501        raise ValueError(f"[SHEET_TYPE ERROR]")502    if report_label:503        report_label = str(report_label)504    if isinstance(fields, str):505        fields = list(506            set([fields, "code", "ann_date", "report_date", "f_ann_date", "report_type"]))507    elif fields:508        fields = list(509            set(fields + ["code", "ann_date", "report_date", "f_ann_date", "report_type"]))510    coll = eval(f"DATABASE.{sheet_type}")511    if (not code) and (not report_label):512        # 为äºå å¿«æ£ç´¢é度ï¼ä»å½åæ¥æå¾åè³å¤å溯ä¸å£åº¦ï¼å®é
è°ä»æ¶ï¼ä»
èèå½åè½æ¿å°çææ°æ°æ®ï¼è°ä»å¨æä¸è¬ä»¥æ, å£ä¸ºåä½ï¼513        # æé¿ä¸è¬ä¸ºå¹´æ¥ï¼èä¿®æ£æ¥è¡¨å¦æè¶
è¿ 1 个å£åº¦ï¼åºæ¬ä¸æ¼è°ä»æ²¡æå½±åï¼è¿é以 1 å¹´ä½ä¸ºå溯åºå514        qry = {515            "f_ann_date_stamp": {516                "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),517                "$lt": QA_util_date_stamp(cursor_date)518            },519            "report_type": {520                "$in": report_type521            }}522        cursor = coll.find(qry, batch_size=10000).sort([523            ("report_date_stamp", pymongo.DESCENDING),524            ("f_ann_date_stamp", pymongo.DESCENDING)])525        try:526            if not fields:527                df = pd.DataFrame(cursor).drop(columns="_id")528            else:529                df = pd.DataFrame(cursor).drop(columns="_id")[fields]530        except:531            raise ValueError("[QRY ERROR]")532        if sheet_type == "balancesheet":533            return df.groupby("code").apply(lambda x: x.iloc[0])534        return df.groupby("code").apply(_trans_financial_type).unstack()535    if not report_label:536        qry = {537            "code": {538                "$in": code539            },540            "f_ann_date_stamp": {541                "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),542                "$lt": QA_util_date_stamp(cursor_date)543            },544            "report_type": {"$in": report_type}}545        cursor = coll.find(qry, batch_size=10000).sort([546            ("report_date_stamp", pymongo.DESCENDING),547            ("f_ann_date_stamp", pymongo.DESCENDING)])548        try:549            if not fields:550                df = pd.DataFrame(cursor).drop(columns="_id")551            else:552                df = pd.DataFrame(cursor).drop(columns="_id")[fields]553        except:554            raise ValueError("[QRY ERROR]")555        if sheet_type == "balancesheet":556            return df.groupby("code").apply(lambda x: x.iloc[0])557        return df.groupby("code").apply(_trans_financial_type).unstack()558    if not code:559        qry = {560            "f_ann_date_stamp": {561                "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),562                "$lt": QA_util_date_stamp(cursor_date)563            },564            "report_type": {565                "$in": report_type566            },567            "report_label": report_label568        }569        cursor = coll.find(qry, batch_size=10000).sort([570            ("report_date_stamp", pymongo.DESCENDING),571            ("f_ann_date_stamp", pymongo.DESCENDING)])572        try:573            if not fields:574                df = pd.DataFrame(cursor).drop(columns="_id")575            else:576                df = pd.DataFrame(cursor).drop(columns="_id")[fields]577        except:578            raise ValueError("[QRY ERROR]")579        if sheet_type == "balancesheet":580            return df.groupby("code").apply(lambda x: x.iloc[0])581        return df.groupby("code").apply(_trans_financial_type).unstack()582    else:583        qry = {584            "code": {585                "$in": code586            },587            "f_ann_date_stamp": {588                "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),589                "$lt": QA_util_date_stamp(cursor_date)590            },591            "report_type": {592                "$in": report_type593            },594            "report_label": report_label595        }596        cursor = coll.find(qry, batch_size=10000).sort([597            ("report_date_stamp", pymongo.DESCENDING),598            ("f_ann_date_stamp", pymongo.DESCENDING)])599        try:600            if not fields:601                df = pd.DataFrame(cursor).drop(columns="_id")602            else:603                df = pd.DataFrame(cursor).drop(columns="_id")[fields]604        except:605            raise ValueError("[QRY ERROR]")606        # df.report_date = pd.to_datetime(df.report_date, utc=False)607        # df.ann_date = pd.to_datetime(df.ann_date, utc=False)608        # df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)609        if sheet_type == "balancesheet":610            return df.groupby("code").apply(lambda x: x.iloc[0])611        return df.groupby("code").apply(_trans_financial_type).unstack()612def QA_fetch_stock_basic(613        code: Union[str, List, Tuple] = None,614        status: Union[str, List, Tuple] = 'L') -> pd.DataFrame:615    """è·åè¡ç¥¨åºæ¬ä¿¡æ¯616    Args:617        code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º Noneï¼è·åå
¨é¨è¡ç¥¨618        status (Union[str, List, Tuple], optional): è¡ç¥¨ç¶æ, é»è®¤ä¸º 'L', å³ä»å¨ä¸å¸çè¡ç¥¨ï¼å¦æä¸º Noneï¼ åè¿åææç¶æè¡ç¥¨619    Returns:620        pd.DataFrame: è¡ç¥¨åºæ¬ä¿¡æ¯621    """622    coll = DATABASE.stock_basic623    if isinstance(code, str):624        code = (code,)625    if isinstance(status, str):626        status = (status,)627    qry = {}628    if not status:629        if not code:630            qry = {}631        else:632            qry = {633                "code": {634                    "$in": code635                }636            }637    else:638        if not code:639            qry = {640                "status": {641                    "$in": status642                }643            }644        else:645            qry = {646                "code": {647                    "$in": code648                },649                "status": {650                    "$in": status651                }652            }653    cursor = coll.find(qry)654    res = pd.DataFrame(cursor)655    if res.empty:656        return res657    else:658        res.list_date = pd.to_datetime(res.list_date, utc=False)659        return res.drop(columns="_id").set_index("code")660def QA_fetch_stock_name(661        code: Union[str, List, Tuple] = None,662        cursor_date: Union[str, datetime.datetime, pd.Timestamp] = None663) -> pd.DataFrame:664    """è·åè¡ç¥¨å岿¾ç¨å665    Args:666        code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º Noneï¼æ¥è¯¢ææè¡ç¥¨.667        cursor (Union[str, datetime.datetime, pd.Timestamp], optional): æªæ¢æ¶é´ï¼è¡ç¥¨åç§°è·ç¦» cursor_date æè¿çåå 668    Returns:669        pd.DataFrame: è¡ç¥¨å岿¾ç¨å670    """671    coll = DATABASE.namechange672    if isinstance(code, str):673        code = [code]674    qry = {}675    if not code:676        if not cursor_date:677            qry = {}678        else:679            qry = {680                "start_date_stamp": {681                    "$lte": QA_util_date_stamp(cursor_date)682                },683                "end_date_stamp": {684                    "$gte": QA_util_date_stamp(cursor_date)685                }686            }687    else:688        if not cursor_date:689            qry = {690                "code": {691                    "$in": code692                }693            }694        else:695            qry = {696                "code": {697                    "$in": code698                },699                "start_date_stamp": {700                    "$lte": QA_util_date_stamp(cursor_date)701                },702                "end_date_stamp": {703                    "$gte": QA_util_date_stamp(cursor_date)704                }705            }706    cursor = coll.find(qry)707    res = pd.DataFrame(cursor)708    if res.empty:709        return res710    else:711        res.start_date = pd.to_datetime(res.start_date, utc=False)712        res.end_date = pd.to_datetime(res.end_date, utc=False)713        return res.drop(columns="_id").set_index("code").sort_values(by="start_date_stamp").drop_duplicates(keep="last").sort_index()714def QA_fetch_industry_adv(715    code: Union[str, List, Tuple] = None,716    cursor_date: Union[str, datetime.datetime] = None,717    start: Union[str, datetime.datetime] = None,718    end: Union[str, datetime.datetime] = None,719    levels: Union[str, List, Tuple] = None,720    src: str = "sw"721) -> pd.DataFrame:722    """æ¬å°è·åæå®è¡ç¥¨æè¡ç¥¨å表çè¡ä¸723    Args:724        code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º None, æ¥è¯¢ææè¡ç¥¨ä»£ç .725        cursor_date (Union[str, datetime.datetime], optional): ä¸è¬æè°ä»æ¥ï¼æ¤æ¶ä¸éè¦å设置 start ä¸ end726        start(Union[str, datetime.datetime], optional): èµ·å§æ¶é´ï¼é»è®¤ä¸º None.727        end(Union[str, datetime.datetime], optional): æªæ¢æ¶é´, é»è®¤ä¸º None.728        levels (Union[str, List, Tuple], optional): [description]. 对åºè¡ä¸å级级å«ï¼é»è®¤ä¸º Noneï¼æ¥è¯¢ææè¡ä¸åçº§æ°æ®729        src (str, optional): åçº§æ¥æºï¼é»è®¤ä¸º "sw"(ç®åä»
æ¯æç³ä¸è¡ä¸åç±»).730    Returns:731        pd.DataFrame: è¡ä¸ä¿¡æ¯732    """733    coll = DATABASE.industry734    if not code:735        code = QA_fetch_stock_list().index.tolist()736    if isinstance(code, str):737        code = [code]738    if isinstance(levels, str):739        levels = [levels, ]740    if not levels:741        levels = ["l1", "l2", "l3"]742    levels = list(map(lambda x: x.lower(), levels))743    df_tmp = pd.DataFrame()744    if not cursor_date:745        if not start:746            qry = {747                "code": {748                    "$in": code749                },750                "level": {751                    "$in": levels752                },753                "src": src.lower()754            }755        else:756            qry = {757                "code": {758                    "$in": code759                },760                "level": {761                    "$in": levels762                },763                "src": src.lower(),764                "in_date_stamp": {765                    "$lte": QA_util_date_stamp(pd.Timestamp(start).strftime("%Y-%m-%d"))766                }767            }768        if coll.count_documents(filter=qry) < 1:769            print("æ¾ä¸å°å¯¹åºè¡ä¸æ°æ®")770            return pd.DataFrame()771        cursor = coll.find(qry)772        df_tmp = pd.DataFrame(cursor).drop(columns="_id")773        if end:774            df_tmp = df_tmp.loc[df_tmp.out_date_stamp > QA_util_date_stamp(775                pd.Timestamp(end).strftime("%Y-%m-%d"))]776    else:777        qry = {778            "code": {779                "$in": code780            },781            "level": {782                "$in": levels783            },784            "src": src.lower(),785            "in_date_stamp": {786                "$lte": QA_util_date_stamp(pd.Timestamp(cursor_date).strftime("%Y-%m-%d"))787            }788        }789        if coll.count_documents(filter=qry) < 1:790            print("æ¾ä¸å°å¯¹åºè¡ä¸æ°æ®")791            return pd.DataFrame()792        cursor = coll.find(qry)793        df_tmp = pd.DataFrame(cursor).drop(columns="_id")794        df_tmp.loc[df_tmp.out_date_stamp > QA_util_date_stamp(795            pd.Timestamp(cursor_date).strftime("%Y-%m-%d"))]796        df_tmp.in_date = pd.to_datetime(df_tmp.in_date, utc=False)797        df_tmp.out_date = pd.to_datetime(df_tmp.out_date, utc=False)798    return df_tmp.drop(columns=["in_date_stamp", "out_date_stamp"])799def QA_fetch_daily_basic(800    code: Union[str, List, Tuple] = None,801    start: Union[str, pd.Timestamp, datetime.datetime] = None,802    end: Union[str, pd.Timestamp, datetime.datetime] = None,803    cursor_date: Union[str, pd.Timestamp, datetime.datetime] = None,804    fields: Union[str, Tuple, List]= None805) -> pd.DataFrame:806    """è·åå
¨é¨è¡ç¥¨æ¯æ¥éè¦çåºæ¬é¢ææ ï¼å¯ç¨äºéè¡åæãæ¥è¡¨å±ç¤ºç807    Args:808        code (Union[str, List, Tuple], optional): æå®è¡ç¥¨ä»£ç æå表,  é»è®¤ä¸º Noneï¼è·åå
¨å¸åº809        start (Union[str, pd.Timestamp, datetime.datetime], optional): èµ·å§æ¥æï¼é»è®¤ä¸º None810        end (Union[str, pd.Timestamp, datetime.datetime], optional): ç»ææ¥æï¼é»è®¤ä¸º None811        cursor_date (Union[str, pd.Timestamp, datetime.datetime], optional): æå®æ¥æï¼ä¸ start å end å²çªï¼åªè½éæ© cursor_date812           æè
 start, end813        fields (Union[str, Tuple, List], optional): æå® fields814    Returns:815        pd.DataFrame: ä»¥æ¥æï¼è¡ç¥¨å为 Multiindex çåºæ¬ä¿¡æ¯816    """817    if isinstance(code, str):818        code = (code,)819    if not code:820        if (not start) and (not cursor_date):821            raise ValueError(822                "[ERROR]\tstart and end and cursor_date cannot all be none!")823        if not cursor_date:824            if not end:825                end_stamp = QA_util_date_stamp(datetime.date.today())826            else:827                end_stamp = QA_util_date_stamp(end)828            start_stamp = QA_util_date_stamp(start)829            qry = {830                "trade_date_stamp": {831                    "$gte": start_stamp,832                    "$lte": end_stamp833                }834            }835        else:836            real_trade_date = QA_util_get_real_date(cursor_date)837            trade_date_stamp = QA_util_date_stamp(real_trade_date)838            qry = {839                "trade_date_stamp":  trade_date_stamp840            }841    else:842        if (not start) and (not cursor_date):843            raise ValueError(844                "[ERROR]\tstart and end and cursor_date cannot all be none!")845        if not cursor_date:846            if not end:847                end_stamp = QA_util_date_stamp(datetime.date.today())848            else:849                end_stamp = QA_util_date_stamp(end)850            start_stamp = QA_util_date_stamp(start)851            qry = {852                "code": {853                    "$in": code854                },855                "trade_date_stamp": {856                    "$gte": start_stamp,857                    "$lte": end_stamp858                }859            }860        else:861            real_trade_date = QA_util_get_real_date(cursor_date)862            trade_date_stamp = QA_util_date_stamp(real_trade_date)863            qry = {864                "code": {865                    "$in": code866                },867                "trade_date_stamp": trade_date_stamp868            }869    coll = DATABASE.daily_basic870    cursor = coll.find(qry)871    df = pd.DataFrame(cursor)872    if df.empty:873        return df874    df = df.rename(columns={"trade_date": "date"}).drop(875        columns="_id")876    df.date = pd.to_datetime(df.date, utc=False)877    df = df.set_index(["date", "code"]).sort_index()878    if not fields:879        return df880    return df[fields]881if __name__ == "__main__":882    # print(QA_fetch_get_individual_financial(883    #     "000001", "2020-01-01", "2020-12-31"))884    # print(QA_fetch_get_individual_financial(885    #      "000001", report_date="2020-03-31", fields="basic_eps"))886    # print(QA_fetch_get_crosssection_financial('2020-03-31'))887    # print(QA_fetch_crosssection_financial("2020-03-31", fields="basic_eps"))888    # df = QA_fetch_financial_adv(start="2018-06-30", end="2018-09-30")889    # print(df.loc['000528', ["report_date", "f_ann_date",890    #                         "ann_date", "basic_eps", "report_type", "update_flag", "report_label"]])891    # print(df)892    # print(QA_fetch_stock_basic(status="D"))893    # æè¿è´¢å¡æ°æ®è·åæµè¯894    # print(QA_fetch_last_financial(895    #     code="000596", cursor_date="2020-10-08"))896    # print(QA_fetch_last_financial(897    #         code=QA_fetch_stock_list().index.tolist(), cursor_date="2020-10-08"))898    # print(QA_fetch_last_financial(899    #         code = '000001', cursor_date = '2020-10-08'900    # ))901    code = QA_fetch_stock_list().index.tolist()902    cursor_date = '2020-10-08'903    df_origin = QA_fetch_last_financial(code = code, cursor_date = cursor_date, sheet_type = "balancesheet")904    # print(QA_fetch_last_financial(905    #     cursor_date="2018-08-31"))906    # print(QA_fetch_last_financial(907    #     cursor_date="2018-08-31", code=["000528"], fields=["report_date", "ann_date", "f_ann_date", "update_flag"]))908    # print(QA_fetch_financial_adv(909    #     cursor_date="2018-08-31"))910    # è¡ç¥¨åºæ¬ä¿¡æ¯è·åæµè¯911    # print(QA_fetch_stock_basic("000001"))912    # print(QA_fetch_stock_basic(status=["P", "D"]))913    # è¡ä¸è·åæµè¯914    # print(QA_fetch_industry_adv(start="1998-01-01", end="2020-12-02").head())915    # print(QA_fetch_industry_adv(["000001", "600000"],916    #                             start="1998-01-01", end="2020-12-02"))917    # print(QA_fetch_industry_adv(918    #     ["000001", "600000"], cursor_date="2020-12-02"))919    # print(QA_fetch_stock_name(920    #     code=['000001', '000002'], cursor_date="20081009"))...upload.py
Source:upload.py  
1import os2import luigi3import yaml4from morgoth.balrog_handlers import ProcessFitResults5from morgoth.plots import (6    Create3DLocationPlot,7    CreateBalrogSwiftPlot,8    CreateCornerPlot,9    CreateLightcurve,10    CreateLocationPlot,11    CreateMollLocationPlot,12    CreateSatellitePlot,13    CreateSpectrumPlot14)15from morgoth.data_files import (16    CreateHealpixSysErr,17    CreateHealpix18)19from morgoth.configuration import morgoth_config20from morgoth.utils.file_utils import if_dir_containing_file_not_existing_then_make21from morgoth.utils.env import get_env_value22from morgoth.utils.upload_utils import upload_grb_report, upload_plot, upload_datafile23base_dir = get_env_value("GBM_TRIGGER_DATA_DIR")24class UploadReport(luigi.Task):25    grb_name = luigi.Parameter()26    report_type = luigi.Parameter()27    version = luigi.Parameter()28    def requires(self):29        return ProcessFitResults(30            grb_name=self.grb_name, report_type=self.report_type, version=self.version31        )32    def output(self):33        filename = f"{self.report_type}_{self.version}_report.yml"34        return luigi.LocalTarget(35            os.path.join(36                base_dir, self.grb_name, self.report_type, self.version, filename37            )38        )39    def run(self):40        with self.input()["result_file"].open() as f:41            result = yaml.safe_load(f)42        report = upload_grb_report(43            grb_name=self.grb_name,44            result=result,45            wait_time=float(46                morgoth_config["upload"]["report"]["interval"]47            ),48            max_time=float(49                morgoth_config["upload"]["report"]["max_time"]50            ),51        )52        report_name = f"{self.report_type}_{self.version}_report.yml"53        report_path = os.path.join(54            base_dir, self.grb_name, self.report_type, self.version, report_name55        )56        with open(report_path, "w") as f:57            yaml.dump(report, f, default_flow_style=False)58class UploadAllDataFiles(luigi.Task):59    grb_name = luigi.Parameter()60    report_type = luigi.Parameter()61    version = luigi.Parameter(default="v00")62    63    def requires(self):64        return {65            "healpix": UploadHealpix(66                grb_name=self.grb_name,67                report_type=self.report_type,68                version=self.version,69            ),70            "healpixSysErr": UploadHealpixSysErr(71                grb_name=self.grb_name,72                report_type=self.report_type,73                version=self.version,74            ),75        }76    def output(self):77        filename = f"{self.report_type}_{self.version}_upload_datafiles.done"78        return luigi.LocalTarget(79            os.path.join(80                base_dir,81                self.grb_name,82                self.report_type,83                self.version,84                "upload",85                filename,86            )87        )88    def run(self):89        filename = f"{self.report_type}_{self.version}_upload_datafiles.done"90        tmp = os.path.join(91            base_dir, self.grb_name, self.report_type, self.version, "upload", filename92        )93        if_dir_containing_file_not_existing_then_make(tmp)94        os.system(f"touch {tmp}")95class UploadHealpix(luigi.Task):96    grb_name = luigi.Parameter()97    report_type = luigi.Parameter()98    version = luigi.Parameter(default="v00")99    def requires(self):100        return {101            "create_report": UploadReport(102                grb_name=self.grb_name,103                report_type=self.report_type,104                version=self.version,105            ),106            "data_file": CreateHealpix(107                grb_name=self.grb_name,108                report_type=self.report_type,109                version=self.version,110            ),111        }112    def output(self):113        filename = f"{self.report_type}_{self.version}_upload_healpix.done"114        return luigi.LocalTarget(115            os.path.join(116                base_dir,117                self.grb_name,118                self.report_type,119                self.version,120                "upload",121                filename,122            )123        )124    def run(self):125        126        upload_datafile(127            grb_name=self.grb_name,128            report_type=self.report_type,129            data_file=self.input()["data_file"].path,130            file_type="healpix",131            version=self.version,132            wait_time=float(133                morgoth_config["upload"]["plot"]["interval"]134            ),135            max_time=float(136                morgoth_config["upload"]["plot"]["max_time"]137            ),138        )139        if_dir_containing_file_not_existing_then_make(self.output().path)140        os.system(f"touch {self.output().path}")141class UploadHealpixSysErr(luigi.Task):142    grb_name = luigi.Parameter()143    report_type = luigi.Parameter()144    version = luigi.Parameter(default="v00")145    def requires(self):146        return {147            "create_report": UploadReport(148                grb_name=self.grb_name,149                report_type=self.report_type,150                version=self.version,151            ),152            "data_file": CreateHealpixSysErr(153                grb_name=self.grb_name,154                report_type=self.report_type,155                version=self.version,156            ),157        }158    def output(self):159        filename = f"{self.report_type}_{self.version}_upload_healpixsyserr.done"160        return luigi.LocalTarget(161            os.path.join(162                base_dir,163                self.grb_name,164                self.report_type,165                self.version,166                "upload",167                filename,168            )169        )170    def run(self):171        upload_datafile(172            grb_name=self.grb_name,173            report_type=self.report_type,174            data_file=self.input()["data_file"].path,175            file_type="healpixSysErr",176            version=self.version,177            wait_time=float(178                morgoth_config["upload"]["plot"]["interval"]179            ),180            max_time=float(181                morgoth_config["upload"]["plot"]["max_time"]182            ),183        )184        filename = f"{self.report_type}_{self.version}_upload_plot_location.done"185        tmp = os.path.join(186            base_dir, self.grb_name, self.report_type, self.version, "upload", filename187        )188        if_dir_containing_file_not_existing_then_make(self.output().path)189        os.system(f"touch {self.output().path}")190class UploadAllPlots(luigi.Task):191    grb_name = luigi.Parameter()192    report_type = luigi.Parameter()193    version = luigi.Parameter(default="v00")194    def requires(self):195        return {196            "lightcurves": UploadAllLightcurves(197                grb_name=self.grb_name,198                report_type=self.report_type,199                version=self.version,200            ),201            "location": UploadLocationPlot(202                grb_name=self.grb_name,203                report_type=self.report_type,204                version=self.version,205            ),206            "corner": UploadCornerPlot(207                grb_name=self.grb_name,208                report_type=self.report_type,209                version=self.version,210            ),211            "molllocation": UploadMollLocationPlot(212                grb_name=self.grb_name,213                report_type=self.report_type,214                version=self.version,215            ),216            "satellite": UploadSatellitePlot(217                grb_name=self.grb_name,218                report_type=self.report_type,219                version=self.version,220            ),221            "spectrum": UploadSpectrumPlot(222                grb_name=self.grb_name,223                report_type=self.report_type,224                version=self.version,225            ),226            "3d_location": Upload3DLocationPlot(227                grb_name=self.grb_name,228                report_type=self.report_type,229                version=self.version,230            ),231            "balrogswift": UploadBalrogSwiftPlot(232                grb_name=self.grb_name,233                report_type=self.report_type,234                version=self.version,235            ),236        }237    def output(self):238        filename = f"{self.report_type}_{self.version}_upload_plot_all.done"239        return luigi.LocalTarget(240            os.path.join(241                base_dir,242                self.grb_name,243                self.report_type,244                self.version,245                "upload",246                filename,247            )248        )249    def run(self):250        filename = f"{self.report_type}_{self.version}_upload_plot_all.done"251        tmp = os.path.join(252            base_dir, self.grb_name, self.report_type, self.version, "upload", filename253        )254        if_dir_containing_file_not_existing_then_make(tmp)255        os.system(f"touch {tmp}")256class UploadAllLightcurves(luigi.Task):257    grb_name = luigi.Parameter()258    report_type = luigi.Parameter()259    version = luigi.Parameter(default="v00")260    def requires(self):261        return {262            "n0": UploadLightcurve(263                grb_name=self.grb_name,264                report_type=self.report_type,265                detector="n0",266                version=self.version,267            ),268            "n1": UploadLightcurve(269                grb_name=self.grb_name,270                report_type=self.report_type,271                detector="n1",272                version=self.version,273            ),274            "n2": UploadLightcurve(275                grb_name=self.grb_name,276                report_type=self.report_type,277                detector="n2",278                version=self.version,279            ),280            "n3": UploadLightcurve(281                grb_name=self.grb_name,282                report_type=self.report_type,283                detector="n3",284                version=self.version,285            ),286            "n4": UploadLightcurve(287                grb_name=self.grb_name,288                report_type=self.report_type,289                detector="n4",290                version=self.version,291            ),292            "n5": UploadLightcurve(293                grb_name=self.grb_name,294                report_type=self.report_type,295                detector="n5",296                version=self.version,297            ),298            "n6": UploadLightcurve(299                grb_name=self.grb_name,300                report_type=self.report_type,301                detector="n6",302                version=self.version,303            ),304            "n7": UploadLightcurve(305                grb_name=self.grb_name,306                report_type=self.report_type,307                detector="n7",308                version=self.version,309            ),310            "n8": UploadLightcurve(311                grb_name=self.grb_name,312                report_type=self.report_type,313                detector="n8",314                version=self.version,315            ),316            "n9": UploadLightcurve(317                grb_name=self.grb_name,318                report_type=self.report_type,319                detector="n9",320                version=self.version,321            ),322            "na": UploadLightcurve(323                grb_name=self.grb_name,324                report_type=self.report_type,325                detector="na",326                version=self.version,327            ),328            "nb": UploadLightcurve(329                grb_name=self.grb_name,330                report_type=self.report_type,331                detector="nb",332                version=self.version,333            ),334            "b0": UploadLightcurve(335                grb_name=self.grb_name,336                report_type=self.report_type,337                detector="b0",338                version=self.version,339            ),340            "b1": UploadLightcurve(341                grb_name=self.grb_name,342                report_type=self.report_type,343                detector="b1",344                version=self.version,345            ),346        }347    def output(self):348        filename = f"{self.report_type}_{self.version}_upload_plot_all_lightcurves.done"349        return luigi.LocalTarget(350            os.path.join(351                base_dir,352                self.grb_name,353                self.report_type,354                self.version,355                "upload",356                filename,357            )358        )359    def run(self):360        filename = f"{self.report_type}_{self.version}_upload_plot_all_lightcurves.done"361        tmp = os.path.join(362            base_dir, self.grb_name, self.report_type, self.version, "upload", filename363        )364        if_dir_containing_file_not_existing_then_make(tmp)365        os.system(f"touch {tmp}")366class UploadLightcurve(luigi.Task):367    grb_name = luigi.Parameter()368    report_type = luigi.Parameter()369    detector = luigi.Parameter()370    version = luigi.Parameter(default="v00")371    def requires(self):372        return {373            "create_report": UploadReport(374                grb_name=self.grb_name,375                report_type=self.report_type,376                version=self.version,377            ),378            "plot_file": CreateLightcurve(379                grb_name=self.grb_name,380                report_type=self.report_type,381                detector=self.detector,382                version=self.version,383            ),384        }385    def output(self):386        filename = f"{self.report_type}_{self.version}_{self.detector}_upload_plot_lightcurve.done"387        return luigi.LocalTarget(388            os.path.join(389                base_dir,390                self.grb_name,391                self.report_type,392                self.version,393                "upload",394                filename,395            )396        )397    def run(self):398        upload_plot(399            grb_name=self.grb_name,400            report_type=self.report_type,401            plot_file=self.input()["plot_file"].path,402            plot_type="lightcurve",403            version=self.version,404            wait_time=float(405                morgoth_config["upload"]["plot"]["interval"]406            ),407            max_time=float(408                morgoth_config["upload"]["plot"]["max_time"]409            ),410            det_name=self.detector,411        )412        filename = f"{self.report_type}_{self.version}_{self.detector}_upload_plot_lightcurve.done"413        tmp = os.path.join(414            base_dir, self.grb_name, self.report_type, self.version, "upload", filename415        )416        if_dir_containing_file_not_existing_then_make(tmp)417        os.system(f"touch {tmp}")418class UploadLocationPlot(luigi.Task):419    grb_name = luigi.Parameter()420    report_type = luigi.Parameter()421    version = luigi.Parameter(default="v00")422    def requires(self):423        return {424            "create_report": UploadReport(425                grb_name=self.grb_name,426                report_type=self.report_type,427                version=self.version,428            ),429            "plot_file": CreateLocationPlot(430                grb_name=self.grb_name,431                report_type=self.report_type,432                version=self.version,433            ),434        }435    def output(self):436        filename = f"{self.report_type}_{self.version}_upload_plot_location.done"437        return luigi.LocalTarget(438            os.path.join(439                base_dir,440                self.grb_name,441                self.report_type,442                self.version,443                "upload",444                filename,445            )446        )447    def run(self):448        upload_plot(449            grb_name=self.grb_name,450            report_type=self.report_type,451            plot_file=self.input()["plot_file"].path,452            plot_type="location",453            version=self.version,454            wait_time=float(455                morgoth_config["upload"]["plot"]["interval"]456            ),457            max_time=float(458                morgoth_config["upload"]["plot"]["max_time"]459            ),460        )461        filename = f"{self.report_type}_{self.version}_upload_plot_location.done"462        tmp = os.path.join(463            base_dir, self.grb_name, self.report_type, self.version, "upload", filename464        )465        if_dir_containing_file_not_existing_then_make(tmp)466        os.system(f"touch {tmp}")467class UploadCornerPlot(luigi.Task):468    grb_name = luigi.Parameter()469    report_type = luigi.Parameter()470    version = luigi.Parameter(default="v00")471    def requires(self):472        return {473            "create_report": UploadReport(474                grb_name=self.grb_name,475                report_type=self.report_type,476                version=self.version,477            ),478            "plot_file": CreateCornerPlot(479                grb_name=self.grb_name,480                report_type=self.report_type,481                version=self.version,482            ),483        }484    def output(self):485        filename = f"{self.report_type}_{self.version}_upload_plot_corner.done"486        return luigi.LocalTarget(487            os.path.join(488                base_dir,489                self.grb_name,490                self.report_type,491                self.version,492                "upload",493                filename,494            )495        )496    def run(self):497        upload_plot(498            grb_name=self.grb_name,499            report_type=self.report_type,500            plot_file=self.input()["plot_file"].path,501            plot_type="allcorner",502            version=self.version,503            wait_time=float(504                morgoth_config["upload"]["plot"]["interval"]505            ),506            max_time=float(507                morgoth_config["upload"]["plot"]["max_time"]508            ),509        )510        filename = f"{self.report_type}_{self.version}_upload_plot_corner.done"511        tmp = os.path.join(512            base_dir, self.grb_name, self.report_type, self.version, "upload", filename513        )514        if_dir_containing_file_not_existing_then_make(tmp)515        os.system(f"touch {tmp}")516class UploadMollLocationPlot(luigi.Task):517    grb_name = luigi.Parameter()518    report_type = luigi.Parameter()519    version = luigi.Parameter(default="v00")520    def requires(self):521        return {522            "create_report": UploadReport(523                grb_name=self.grb_name,524                report_type=self.report_type,525                version=self.version,526            ),527            "plot_file": CreateMollLocationPlot(528                grb_name=self.grb_name,529                report_type=self.report_type,530                version=self.version,531            ),532        }533    def output(self):534        filename = f"{self.report_type}_{self.version}_upload_plot_molllocation.done"535        return luigi.LocalTarget(536            os.path.join(537                base_dir,538                self.grb_name,539                self.report_type,540                self.version,541                "upload",542                filename,543            )544        )545    def run(self):546        upload_plot(547            grb_name=self.grb_name,548            report_type=self.report_type,549            plot_file=self.input()["plot_file"].path,550            plot_type="molllocation",551            version=self.version,552            wait_time=float(553                morgoth_config["upload"]["plot"]["interval"]554            ),555            max_time=float(556                morgoth_config["upload"]["plot"]["max_time"]557            ),558        )559        filename = f"{self.report_type}_{self.version}_upload_plot_molllocation.done"560        tmp = os.path.join(561            base_dir, self.grb_name, self.report_type, self.version, "upload", filename562        )563        if_dir_containing_file_not_existing_then_make(tmp)564        os.system(f"touch {tmp}")565class UploadSatellitePlot(luigi.Task):566    grb_name = luigi.Parameter()567    report_type = luigi.Parameter()568    version = luigi.Parameter(default="v00")569    def requires(self):570        return {571            "create_report": UploadReport(572                grb_name=self.grb_name,573                report_type=self.report_type,574                version=self.version,575            ),576            "plot_file": CreateSatellitePlot(577                grb_name=self.grb_name,578                report_type=self.report_type,579                version=self.version,580            ),581        }582    def output(self):583        filename = f"{self.report_type}_{self.version}_upload_plot_satellite.done"584        return luigi.LocalTarget(585            os.path.join(586                base_dir,587                self.grb_name,588                self.report_type,589                self.version,590                "upload",591                filename,592            )593        )594    def run(self):595        upload_plot(596            grb_name=self.grb_name,597            report_type=self.report_type,598            plot_file=self.input()["plot_file"].path,599            plot_type="satellite",600            version=self.version,601            wait_time=float(602                morgoth_config["upload"]["plot"]["interval"]603            ),604            max_time=float(605                morgoth_config["upload"]["plot"]["max_time"]606            ),607        )608        filename = f"{self.report_type}_{self.version}_upload_plot_satellite.done"609        tmp = os.path.join(610            base_dir, self.grb_name, self.report_type, self.version, "upload", filename611        )612        if_dir_containing_file_not_existing_then_make(tmp)613        os.system(f"touch {tmp}")614class UploadSpectrumPlot(luigi.Task):615    grb_name = luigi.Parameter()616    report_type = luigi.Parameter()617    version = luigi.Parameter(default="v00")618    def requires(self):619        return {620            "create_report": UploadReport(621                grb_name=self.grb_name,622                report_type=self.report_type,623                version=self.version,624            ),625            "plot_file": CreateSpectrumPlot(626                grb_name=self.grb_name,627                report_type=self.report_type,628                version=self.version,629            ),630        }631    def output(self):632        filename = f"{self.report_type}_{self.version}_upload_plot_spectrum.done"633        return luigi.LocalTarget(634            os.path.join(635                base_dir,636                self.grb_name,637                self.report_type,638                self.version,639                "upload",640                filename,641            )642        )643    def run(self):644        upload_plot(645            grb_name=self.grb_name,646            report_type=self.report_type,647            plot_file=self.input()["plot_file"].path,648            plot_type="spectrum",649            version=self.version,650            wait_time=float(651                morgoth_config["upload"]["plot"]["interval"]652            ),653            max_time=float(654                morgoth_config["upload"]["plot"]["max_time"]655            ),656        )657        filename = f"{self.report_type}_{self.version}_upload_plot_spectrum.done"658        tmp = os.path.join(659            base_dir, self.grb_name, self.report_type, self.version, "upload", filename660        )661        if_dir_containing_file_not_existing_then_make(tmp)662        os.system(f"touch {tmp}")663class Upload3DLocationPlot(luigi.Task):664    grb_name = luigi.Parameter()665    report_type = luigi.Parameter()666    version = luigi.Parameter(default="v00")667    def requires(self):668        return {669            "create_report": UploadReport(670                grb_name=self.grb_name,671                report_type=self.report_type,672                version=self.version,673            ),674            "plot_file": Create3DLocationPlot(675                grb_name=self.grb_name,676                report_type=self.report_type,677                version=self.version,678            ),679        }680    def output(self):681        filename = f"{self.report_type}_{self.version}_upload_plot_3dlocation.done"682        return luigi.LocalTarget(683            os.path.join(684                base_dir,685                self.grb_name,686                self.report_type,687                self.version,688                "upload",689                filename,690            )691        )692    def run(self):693        upload_plot(694            grb_name=self.grb_name,695            report_type=self.report_type,696            plot_file=self.input()["plot_file"].path,697            plot_type="3dlocation",698            version=self.version,699            wait_time=float(700                morgoth_config["upload"]["plot"]["interval"]701            ),702            max_time=float(703                morgoth_config["upload"]["plot"]["max_time"]704            ),705        )706        filename = f"{self.report_type}_{self.version}_upload_plot_3dlocation.done"707        tmp = os.path.join(708            base_dir, self.grb_name, self.report_type, self.version, "upload", filename709        )710        if_dir_containing_file_not_existing_then_make(tmp)711        os.system(f"touch {tmp}")712class UploadBalrogSwiftPlot(luigi.Task):713    grb_name = luigi.Parameter()714    report_type = luigi.Parameter()715    version = luigi.Parameter(default="v00")716    def requires(self):717        return {718            "create_report": UploadReport(719                grb_name=self.grb_name,720                report_type=self.report_type,721                version=self.version,722            ),723            "plot_file": CreateBalrogSwiftPlot(724                grb_name=self.grb_name,725                report_type=self.report_type,726                version=self.version,727            ),728        }729    def output(self):730        filename = f"{self.report_type}_{self.version}_upload_plot_balrogswift.done"731        return luigi.LocalTarget(732            os.path.join(733                base_dir,734                self.grb_name,735                self.report_type,736                self.version,737                "upload",738                filename,739            )740        )741    def run(self):742        upload_plot(743            grb_name=self.grb_name,744            report_type=self.report_type,745            plot_file=self.input()["plot_file"].path,746            plot_type="balrogswift",747            version=self.version,748            wait_time=float(749                morgoth_config["upload"]["plot"]["interval"]750            ),751            max_time=float(752                morgoth_config["upload"]["plot"]["max_time"]753            ),754        )755        filename = f"{self.report_type}_{self.version}_upload_plot_balrogswift.done"756        tmp = os.path.join(757            base_dir, self.grb_name, self.report_type, self.version, "upload", filename758        )759        if_dir_containing_file_not_existing_then_make(tmp)...plots.py
Source:plots.py  
1import os2import luigi3import yaml4from morgoth.balrog_handlers import ProcessFitResults5from morgoth.downloaders import DownloadTrigdat, GatherTrigdatDownload6from morgoth.exceptions.custom_exceptions import UnkownReportType7from morgoth.utils.env import get_env_value8from morgoth.utils.plot_utils import (9    azimuthal_plot_sat_frame,10    create_corner_all_plot,11    create_corner_loc_plot,12    interactive_3D_plot,13    mollweide_plot,14    swift_gbm_plot,15)16base_dir = get_env_value("GBM_TRIGGER_DATA_DIR")17class CreateAllPlots(luigi.Task):18    grb_name = luigi.Parameter()19    report_type = luigi.Parameter()20    version = luigi.Parameter(default="v00")21    def requires(self):22        return {23            "lightcurves": CreateAllLightcurves(24                grb_name=self.grb_name,25                report_type=self.report_type,26                version=self.version,27            ),28            "location": CreateLocationPlot(29                grb_name=self.grb_name,30                report_type=self.report_type,31                version=self.version,32            ),33            "corner": CreateCornerPlot(34                grb_name=self.grb_name,35                report_type=self.report_type,36                version=self.version,37            ),38            "molllocation": CreateMollLocationPlot(39                grb_name=self.grb_name,40                report_type=self.report_type,41                version=self.version,42            ),43            "satellite": CreateSatellitePlot(44                grb_name=self.grb_name,45                report_type=self.report_type,46                version=self.version,47            ),48            "spectrum": CreateSpectrumPlot(49                grb_name=self.grb_name,50                report_type=self.report_type,51                version=self.version,52            ),53            "3dlocation": Create3DLocationPlot(54                grb_name=self.grb_name,55                report_type=self.report_type,56                version=self.version,57            ),58            "balrogswift": CreateBalrogSwiftPlot(59                grb_name=self.grb_name,60                report_type=self.report_type,61                version=self.version,62            ),63        }64    def output(self):65        filename = f"{self.report_type}_{self.version}_plot_all.txt"66        return luigi.LocalTarget(67            os.path.join(68                base_dir, self.grb_name, self.report_type, self.version, filename69            )70        )71    def run(self):72        filename = f"{self.report_type}_{self.version}_plot_all.txt"73        tmp = os.path.join(74            base_dir, self.grb_name, self.report_type, self.version, filename75        )76        os.system(f"touch {tmp}")77class CreateAllLightcurves(luigi.Task):78    grb_name = luigi.Parameter()79    report_type = luigi.Parameter()80    version = luigi.Parameter(default="v00")81    def requires(self):82        return {83            "n0": CreateLightcurve(84                grb_name=self.grb_name,85                report_type=self.report_type,86                detector="n0",87                version=self.version,88            ),89            "n1": CreateLightcurve(90                grb_name=self.grb_name,91                report_type=self.report_type,92                detector="n1",93                version=self.version,94            ),95            "n2": CreateLightcurve(96                grb_name=self.grb_name,97                report_type=self.report_type,98                detector="n2",99                version=self.version,100            ),101            "n3": CreateLightcurve(102                grb_name=self.grb_name,103                report_type=self.report_type,104                detector="n3",105                version=self.version,106            ),107            "n4": CreateLightcurve(108                grb_name=self.grb_name,109                report_type=self.report_type,110                detector="n4",111                version=self.version,112            ),113            "n5": CreateLightcurve(114                grb_name=self.grb_name,115                report_type=self.report_type,116                detector="n5",117                version=self.version,118            ),119            "n6": CreateLightcurve(120                grb_name=self.grb_name,121                report_type=self.report_type,122                detector="n6",123                version=self.version,124            ),125            "n7": CreateLightcurve(126                grb_name=self.grb_name,127                report_type=self.report_type,128                detector="n7",129                version=self.version,130            ),131            "n8": CreateLightcurve(132                grb_name=self.grb_name,133                report_type=self.report_type,134                detector="n8",135                version=self.version,136            ),137            "n9": CreateLightcurve(138                grb_name=self.grb_name,139                report_type=self.report_type,140                detector="n9",141                version=self.version,142            ),143            "na": CreateLightcurve(144                grb_name=self.grb_name,145                report_type=self.report_type,146                detector="na",147                version=self.version,148            ),149            "nb": CreateLightcurve(150                grb_name=self.grb_name,151                report_type=self.report_type,152                detector="nb",153                version=self.version,154            ),155            "b0": CreateLightcurve(156                grb_name=self.grb_name,157                report_type=self.report_type,158                detector="b0",159                version=self.version,160            ),161            "b1": CreateLightcurve(162                grb_name=self.grb_name,163                report_type=self.report_type,164                detector="b1",165                version=self.version,166            ),167        }168    def output(self):169        filename = f"{self.report_type}_{self.version}_plot_all_lightcurves.txt"170        return luigi.LocalTarget(171            os.path.join(172                base_dir, self.grb_name, self.report_type, self.version, filename173            )174        )175    def run(self):176        filename = f"{self.report_type}_{self.version}_plot_all_lightcurves.txt"177        tmp = os.path.join(178            base_dir, self.grb_name, self.report_type, self.version, filename179        )180        os.system(f"touch {tmp}")181class CreateLightcurve(luigi.Task):182    grb_name = luigi.Parameter()183    report_type = luigi.Parameter()184    detector = luigi.Parameter()185    version = luigi.Parameter(default="v00")186    def requires(self):187        return ProcessFitResults(188            grb_name=self.grb_name, report_type=self.report_type, version=self.version189        )190    def output(self):191        base_job = os.path.join(base_dir, self.grb_name, self.report_type, self.version)192        filename = f"{self.grb_name}_lightcurve_{self.report_type}_detector_{self.detector}_plot_{self.version}.png"193        return luigi.LocalTarget(194            os.path.join(base_job, "plots", "lightcurves", filename)195        )196    def run(self):197        # The lightcurve is created in the background fit Task, this task will check if the creation was successful198        pass199class CreateLocationPlot(luigi.Task):200    grb_name = luigi.Parameter()201    report_type = luigi.Parameter()202    version = luigi.Parameter(default="v00")203    def requires(self):204        return ProcessFitResults(205            grb_name=self.grb_name, report_type=self.report_type, version=self.version206        )207    def output(self):208        filename = (209            f"{self.grb_name}_location_plot_{self.report_type}_{self.version}.png"210        )211        return luigi.LocalTarget(212            os.path.join(213                base_dir,214                self.grb_name,215                self.report_type,216                self.version,217                "plots",218                filename,219            )220        )221    def run(self):222        with self.input()["result_file"].open() as f:223            result = yaml.safe_load(f)224        create_corner_loc_plot(225            post_equal_weights_file=self.input()["post_equal_weights"].path,226            model=result["fit_result"]["model"],227            save_path=self.output().path,228        )229class CreateCornerPlot(luigi.Task):230    grb_name = luigi.Parameter()231    report_type = luigi.Parameter()232    version = luigi.Parameter(default="v00")233    def requires(self):234        return ProcessFitResults(235            grb_name=self.grb_name, report_type=self.report_type, version=self.version236        )237    def output(self):238        filename = (239            f"{self.grb_name}_allcorner_plot_{self.report_type}_{self.version}.png"240        )241        return luigi.LocalTarget(242            os.path.join(243                base_dir,244                self.grb_name,245                self.report_type,246                self.version,247                "plots",248                filename,249            )250        )251    def run(self):252        with self.input()["result_file"].open() as f:253            result = yaml.safe_load(f)254        create_corner_all_plot(255            post_equal_weights_file=self.input()["post_equal_weights"].path,256            model=result["fit_result"]["model"],257            save_path=self.output().path,258        )259class CreateMollLocationPlot(luigi.Task):260    grb_name = luigi.Parameter()261    report_type = luigi.Parameter()262    version = luigi.Parameter(default="v00")263    def requires(self):264        return {265            "fit_result": ProcessFitResults(266                grb_name=self.grb_name,267                report_type=self.report_type,268                version=self.version,269            ),270            "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),271        }272    def output(self):273        filename = (274            f"{base_dir}/{self.grb_name}/{self.report_type}/{self.version}/plots/"275            f"{self.grb_name}_molllocation_plot_{self.report_type}_{self.version}.png"276        )277        return luigi.LocalTarget(filename)278    def run(self):279        with self.input()["fit_result"]["result_file"].open() as f:280            result = yaml.safe_load(f)281        if self.report_type.lower() == "tte":282            with self.input()["trigdat_version"].open() as f:283                trigdat_version = yaml.safe_load(f)["trigdat_version"]284            trigdat_file = DownloadTrigdat(285                grb_name=self.grb_name, version=trigdat_version286            ).output()287        elif self.report_type.lower() == "trigdat":288            trigdat_file = DownloadTrigdat(289                grb_name=self.grb_name, version=self.version290            ).output()291        else:292            raise UnkownReportType(293                f"The report_type '{self.report_type}' is not valid!"294            )295        mollweide_plot(296            grb_name=self.grb_name,297            trigdat_file=trigdat_file.path,298            post_equal_weights_file=self.input()["fit_result"][299                "post_equal_weights"300            ].path,301            used_dets=result["time_selection"]["used_detectors"],302            model=result["fit_result"]["model"],303            ra=result["fit_result"]["ra"],304            dec=result["fit_result"]["dec"],305            swift=result["general"]["swift"],306            save_path=self.output().path,307        )308class CreateSatellitePlot(luigi.Task):309    grb_name = luigi.Parameter()310    report_type = luigi.Parameter()311    version = luigi.Parameter(default="v00")312    def requires(self):313        return {314            "fit_result": ProcessFitResults(315                grb_name=self.grb_name,316                report_type=self.report_type,317                version=self.version,318            ),319            "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),320        }321    def output(self):322        filename = (323            f"{self.grb_name}_satellite_plot_{self.report_type}_{self.version}.png"324        )325        return luigi.LocalTarget(326            os.path.join(327                base_dir,328                self.grb_name,329                self.report_type,330                self.version,331                "plots",332                filename,333            )334        )335    def run(self):336        with self.input()["fit_result"]["result_file"].open() as f:337            result = yaml.safe_load(f)338        if self.report_type.lower() == "tte":339            with self.input()["trigdat_version"].open() as f:340                trigdat_version = yaml.safe_load(f)["trigdat_version"]341            trigdat_file = DownloadTrigdat(342                grb_name=self.grb_name, version=trigdat_version343            ).output()344        elif self.report_type.lower() == "trigdat":345            trigdat_file = DownloadTrigdat(346                grb_name=self.grb_name, version=self.version347            ).output()348        else:349            raise UnkownReportType(350                f"The report_type '{self.report_type}' is not valid!"351            )352        azimuthal_plot_sat_frame(353            grb_name=self.grb_name,354            trigdat_file=trigdat_file.path,355            ra=result["fit_result"]["ra"],356            dec=result["fit_result"]["dec"],357            save_path=self.output().path,358        )359class CreateSpectrumPlot(luigi.Task):360    grb_name = luigi.Parameter()361    report_type = luigi.Parameter()362    version = luigi.Parameter(default="v00")363    def requires(self):364        return ProcessFitResults(365            grb_name=self.grb_name, report_type=self.report_type, version=self.version366        )367    def output(self):368        filename = (369            f"{self.grb_name}_spectrum_plot_{self.report_type}_{self.version}.png"370        )371        return luigi.LocalTarget(372            os.path.join(373                base_dir,374                self.grb_name,375                self.report_type,376                self.version,377                "plots",378                filename,379            )380        )381    def run(self):382        # The spectrum plot is created in the balrog fit Task, this task will check if the creation was successful383        pass384class Create3DLocationPlot(luigi.Task):385    grb_name = luigi.Parameter()386    report_type = luigi.Parameter()387    version = luigi.Parameter(default="v00")388    def requires(self):389        return {390            "fit_result": ProcessFitResults(391                grb_name=self.grb_name,392                report_type=self.report_type,393                version=self.version,394            ),395            "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),396        }397    def output(self):398        filename = (399            f"{self.grb_name}_3dlocation_plot_{self.report_type}_{self.version}.html"400        )401        return luigi.LocalTarget(402            os.path.join(403                base_dir,404                self.grb_name,405                self.report_type,406                self.version,407                "plots",408                filename,409            )410        )411    def run(self):412        with self.input()["fit_result"]["result_file"].open() as f:413            result = yaml.safe_load(f)414        if self.report_type.lower() == "tte":415            with self.input()["trigdat_version"].open() as f:416                trigdat_version = yaml.safe_load(f)["trigdat_version"]417            trigdat_file = DownloadTrigdat(418                grb_name=self.grb_name, version=trigdat_version419            ).output()420        elif self.report_type.lower() == "trigdat":421            trigdat_file = DownloadTrigdat(422                grb_name=self.grb_name, version=self.version423            ).output()424        else:425            raise UnkownReportType(426                f"The report_type '{self.report_type}' is not valid!"427            )428        interactive_3D_plot(429            trigdat_file=trigdat_file.path,430            post_equal_weights_file=self.input()["fit_result"][431                "post_equal_weights"432            ].path,433            used_dets=result["time_selection"]["used_detectors"],434            model=result["fit_result"]["model"],435            save_path=self.output().path,436        )437class CreateBalrogSwiftPlot(luigi.Task):438    grb_name = luigi.Parameter()439    report_type = luigi.Parameter()440    version = luigi.Parameter(default="v00")441    def requires(self):442        return ProcessFitResults(443            grb_name=self.grb_name, report_type=self.report_type, version=self.version444        )445    def output(self):446        filename = (447            f"{self.grb_name}_balrogswift_plot_{self.report_type}_{self.version}.png"448        )449        return luigi.LocalTarget(450            os.path.join(451                base_dir,452                self.grb_name,453                self.report_type,454                self.version,455                "plots",456                filename,457            )458        )459    def run(self):460        with self.input()["result_file"].open() as f:461            result = yaml.safe_load(f)462        swift_gbm_plot(463            grb_name=self.grb_name,464            post_equal_weights_file=self.input()["post_equal_weights"].path,465            model=result["fit_result"]["model"],466            ra=result["fit_result"]["ra"],467            dec=result["fit_result"]["dec"],468            swift=result["general"]["swift"],469            save_path=self.output().path,...copy_report_type.py
Source:copy_report_type.py  
1# -*- encoding: utf-8 -*-2from optparse import make_option3import copy4from django.core.management.base import BaseCommand5from accounts.models import Authority6from common.models import Domain7from reports.models import ReportType, ReportState, ReportTypeCategory, CaseDefinition8class Command(BaseCommand):9    args = '<from_domain> <to_domain> <report_type_id report_type_id ...>'10    help = 'Copy report types'11    option_list = BaseCommand.option_list + (12        make_option(13            '--from-authority',14            action='store',15            dest='from_authority',16            default=None,17            help='Specify origin authority'18        ),19        make_option(20            '--to-authority',21            action='store',22            dest='to_authority',23            default=None,24            help='Specify target authority'25        ),26        make_option(27            '--force',28            action='store_true',29            dest='force',30            default=False,31            help='Whether to force mode, default is dry_run mode'32        )33    )34    def _copy_report_states(self, from_domain, to_domain, from_report_type, to_report_type, dry_run=True):35        original_report_states = ReportState.objects.filter(domain=from_domain, report_type=from_report_type)36        for original_report_state in original_report_states:37            should_save_report_state = False38            try:39                report_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_report_state.code)40                if report_state.name != original_report_state.name:41                    report_state.name = original_report_state.name42                    should_save_report_state = True43            except ReportState.DoesNotExist:44                should_save_report_state = True45                report_state = copy.deepcopy(original_report_state)46                report_state.pk = None47                report_state.domain = to_domain48                report_state.report_type = to_report_type49            if should_save_report_state:50                print "Will copy report state from %s to %s using this data:" % (from_domain.name, to_domain.name)51                print " [FROM] id:%s domain:%s authority:%s report_type:%s report_state_code:%s" % (52                    original_report_state.pk, original_report_state.domain.name, original_report_state.report_type.authority, original_report_state.report_type, original_report_state.code)53                print "   [TO] id:%s domain:%s authority:%s report_type:%s report_state_code:%s" % (54                    report_state.pk, report_state.domain.name, report_state.report_type.authority, report_state.report_type, report_state.code)55                if not dry_run:56                    report_state.save()57                    print "  - Saved id: %s" % report_state.pk58            print ""59    def _copy_case_definitions(self, from_domain, to_domain, from_report_type, to_report_type, dry_run=True):60        original_case_definitions = CaseDefinition.objects.filter(domain=from_domain, report_type=from_report_type)61        for original_case_definition in original_case_definitions:62            should_save_case_definition = False63            try:64                case_definition = CaseDefinition.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.code)65                if case_definition.epl != original_case_definition.epl:66                    case_definition.epl = original_case_definition.epl67                    should_save_case_definition = True68                if case_definition.description != original_case_definition.description:69                    case_definition.description = original_case_definition.description70                    should_save_case_definition = True71                if case_definition.accumulate != original_case_definition.accumulate:72                    case_definition.accumulate = original_case_definition.accumulate73                    should_save_case_definition = True74                if case_definition.window != original_case_definition.window:75                    case_definition.window = original_case_definition.window76                    should_save_case_definition = True77            except CaseDefinition.DoesNotExist:78                should_save_case_definition = True79                case_definition = copy.deepcopy(original_case_definition)80                case_definition.pk = None81                case_definition.domain = to_domain82                case_definition.report_type = to_report_type83            if should_save_case_definition:84                print "Will copy case definition from %s to %s using this data:" % (from_domain.name, to_domain.name)85                print " [FROM] id:%s domain:%s report_type:%s case_def_code:%s" % (86                    original_case_definition.pk, original_case_definition.domain, original_case_definition.report_type.name, original_case_definition.code)87                print "   [TO] id:%s domain:%s report_type:%s case_def_code:%s" % (88                    case_definition.pk, case_definition.domain.name, case_definition.report_type.name, original_case_definition.code)89                if not dry_run:90                    case_definition.from_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.from_state.code)91                    case_definition.to_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.to_state.code)92                    case_definition.save()93                    print "  - Saved id: %s" % case_definition.pk94            print "--"95    def _copy_report_type_categories(self, from_domain, to_domain, dry_run=True):96        original_report_type_categories = ReportTypeCategory.objects.filter(domain=from_domain)97        for original_report_type_category in original_report_type_categories:98            try:99                report_type_category = ReportTypeCategory.objects.get(domain=to_domain, code=original_report_type_category.code)100                if report_type_category.name != original_report_type_category.name:101                    report_type_category.name = original_report_type_category.name102                    print "Will update report type category from %s to %s using this data:" % (103                    from_domain.name, to_domain.name)104                    print " [FROM] id:%s domain:%s code:%s" % (105                        original_report_type_category.pk, original_report_type_category.domain,106                        original_report_type_category.code)107                    print "   [TO] id:%s domain:%s code:%s" % (108                        report_type_category.pk, report_type_category.domain.name, report_type_category.code)109                    if not dry_run:110                        report_type_category.save()111                        print "  - Saved id: %s" % report_type_category.pk112            except ReportTypeCategory.DoesNotExist:113                report_type_category = copy.deepcopy(original_report_type_category)114                report_type_category.pk = None115                report_type_category.domain = to_domain116                print "Will copy report type category from %s to %s using this data:" % (from_domain.name, to_domain.name)117                print " [FROM] id:%s domain:%s code:%s" % (118                    original_report_type_category.pk, original_report_type_category.domain, original_report_type_category.code)119                print "   [TO] id:%s domain:%s code:%s" % (120                    report_type_category.pk, report_type_category.domain.name, report_type_category.code)121                if not dry_run:122                    report_type_category.save()123                    print "  - Saved id: %s" % report_type_category.pk124                print "--"125    def _copy_report_types(self, from_domain, to_domain, from_authority=None, to_authority=None, report_type_ids=[],126                          dry_run=True):127        if dry_run:128            print ">> DRY RUN <<\n"129        self._copy_report_type_categories(from_domain, to_domain, dry_run)130        original_report_types = ReportType.objects.filter(domain=from_domain)131        if from_authority:132            original_report_types = original_report_types.filter(authority=from_authority)133        if report_type_ids:134            original_report_types = original_report_types.filter(id__in=report_type_ids)135        for original_report_type in original_report_types:136            should_save_report_type = False137            try:138                report_type = ReportType.objects.get(domain=to_domain, code=original_report_type.code)139                if report_type.form_definition != original_report_type.form_definition:140                    report_type.form_definition = original_report_type.form_definition141                    should_save_report_type = True142                if report_type.name != original_report_type.name:143                    report_type.name = original_report_type.name144                    should_save_report_type = True145                if report_type.template != original_report_type.template:146                    report_type.template = original_report_type.template147                    should_save_report_type = True148                if report_type.django_template != original_report_type.django_template:149                    report_type.django_template = original_report_type.django_template150                    should_save_report_type = True151                if report_type.summary_template != original_report_type.summary_template:152                    report_type.summary_template = original_report_type.summary_template153                    should_save_report_type = True154            except ReportType.DoesNotExist:155                should_save_report_type = True156                report_type = copy.deepcopy(original_report_type)157                report_type.pk = None158                report_type.domain = to_domain159                report_type.default_state = None160                if not dry_run and original_report_type.category:161                    report_type.category = ReportTypeCategory.objects.get(domain=to_domain, code=original_report_type.category.code)162                if to_authority:163                    report_type.authority = to_authority164                else:165                    report_type.authority = None166            if should_save_report_type:167                print "Will copy report type from %s to %s using this data:" % (from_domain.name, to_domain.name)168                print " [FROM] id:%s domain:%s authority:%s report_type:%s" % (original_report_type.pk, original_report_type.domain.name, original_report_type.authority, original_report_type.name)169                print "   [TO] id:%s domain:%s authority:%s report_type:%s" % (report_type.pk, report_type.domain.name, report_type.authority, report_type.name)170                if not dry_run:171                    report_type.save(set_default_state=True)172                    print "  - Saved id: %s" % report_type.pk173                print ""174            # copy report states175            self._copy_report_states(from_domain, to_domain, from_report_type=original_report_type,176                                     to_report_type=report_type, dry_run=dry_run)177            # copy case definitions178            self._copy_case_definitions(from_domain, to_domain, from_report_type=original_report_type,179                                     to_report_type=report_type, dry_run=dry_run)180            if should_save_report_type:181                default_state = ReportState.objects.get(report_type=report_type, code='report')182                if not report_type.default_state or default_state.id != report_type.default_state.id:183                    print ">> Then will set default state to %s" % default_state.name184                    if not dry_run:185                        report_type.default_state = default_state186                        report_type.save()187            print "---------------------"188    def handle(self, *args, **options):189        from_domain = Domain.objects.get(id=args[0])190        to_domain = Domain.objects.get(id=args[1])191        report_type_ids = list(args[2:])192        from_authority = options['from_authority']193        if from_authority:194            from_authority = Authority.objects.get(domain=from_domain, id=from_authority)195        to_authority = options['to_authority']196        if to_authority:197            to_authority = Authority.objects.get(domain=to_domain, id=to_authority)198        dry_run = not options['force']...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
