Best Python code snippet using Kiwi_python
timeline.py
Source:timeline.py  
1import json2from datetime import datetime, date3from typing import Union4import pandas as pd5from src.compute.changelogs import work_activity_on_interval6from src.compute.utils import Interval7from src.db.utils import SnowflakeWrapper8def build_issue_timelines(sw: SnowflakeWrapper, interval: Interval, keys: Union[None, list] = None) -> pd.DataFrame:9    def filter_log_items(items: dict, prop: str) -> Union[dict, None]:10        filtered = [x for x in items["changelogItems"] if x["field"] == prop]11        if len(filtered) > 1:12            raise Exception(f"More than 1 filtered item of type {prop}: {filtered}")13        elif len(filtered) == 1:14            return filtered[0]15        else:16            return None17    changelogs = work_activity_on_interval(sw, interval, keys)18    timelines = []19    for _, row in changelogs.iterrows():20        timeline = []21        date_issue_created = row["DATECREATED"]22        status_from, status_to, changed_assignee = "BACKLOG", "BACKLOG", False  # default status at the beginning (top of workflow)23        assign_from, assign_to, changed_status = None, None, False24        last_change = date_issue_created25        for logs in row["CHANGELOGITEMS"]:26            date_created = logs["dateCreated"]27            change_assignee = filter_log_items(logs, "assignee")28            if change_assignee is not None:29                assign_from = None if "from" not in change_assignee else change_assignee["from"]30                assign_to = None if "to" not in change_assignee else change_assignee["to"]  # the latest assignee31                # print(f"[{author}]: [{status_to}] Reassign from `{assign_from}`, to `{assign_to}`")32                changed_assignee = True33            change_status = filter_log_items(logs, "status")34            if change_status is not None:35                status_from = None if "fromString" not in change_status else change_status["fromString"]36                status_to = None if "toString" not in change_status else change_status["toString"]  # the latest status37                # print(f"[{author}]: [{assign_to}] Transition from `{status_from}`, to `{status_to}`")38                changed_status = True39            if last_change > date_created:40                raise Exception("WAIT A SEC")41            if changed_status and changed_assignee:  # doesn't work if there are 2+ changes in a small delta time42                timeline.append({43                    "status": status_from,44                    "assignee": assign_from,45                    "date_from": last_change,46                    "date_to": date_created,47                    "tdelta": date_created - last_change48                })49                changed_status = False50                changed_assignee = False51            if changed_status:52                timeline.append({53                    "status": status_from,54                    "assignee": assign_to,55                    "date_from": last_change,56                    "date_to": date_created,57                    "tdelta": date_created - last_change58                })59                changed_status = False60            if changed_assignee:61                timeline.append({62                    "status": status_to,63                    "assignee": assign_from,64                    "date_from": last_change,65                    "date_to": date_created,66                    "tdelta": date_created - last_change67                })68                changed_assignee = False69            last_change = date_created70        timeline.append({71            "status": status_to,  # last known status72            "assignee": assign_to,  # last known assignee73            "date_to": None,  # it's still ongoing74            "date_from": last_change,75            "tdelta": Interval.to_datetime(interval.toDate(raw=True)) - last_change76        })77        timelines.append(json.dumps(timeline, default=Interval.isDate))78    copy = changelogs.copy()79    copy = copy.drop(["CHANGELOGITEMS"], axis=1)80    # print(timelines)81    copy["timelines"] = timelines82    return copy83def persist_issue_timelines(sw: SnowflakeWrapper, interval: Interval, keys: Union[None, list] = None) -> None:84    try:85        timelines = build_issue_timelines(sw, interval, keys)86        SnowflakeWrapper.execute_df_query(timelines, "timelines_temp", ifexists='replace')87        sw.execute(88            f"CREATE OR REPLACE TABLE TIMELINES ( "89            f"    KEY	    VARCHAR(32),  "90            f"    STATUS	VARCHAR(256),  "91            f"    ASSIGNEE	VARCHAR(128), "92            f"    DATEFROM	TIMESTAMP_NTZ(9),  "93            f"    DATETO	TIMESTAMP_NTZ(9),  "94            f"    TIMEDELTA	NUMBER(38,0) "95            f"); "96        )97        sw.execute(98            f"INSERT ALL INTO TIMELINES "99            f"    SELECT "100            f"        c.KEY key, "101            f"        t.VALUE:status::string status, "102            f"        t.VALUE:assignee::string assignee, "103            f"        TO_TIMESTAMP_NTZ(t.VALUE:date_from) datefrom, "104            f"        TO_TIMESTAMP_NTZ(t.VALUE:date_to) dateto, "105            f"        t.VALUE:tdelta::NUMBER timedelta "106            f"    FROM "107            f"        timelines_temp c, "108            f"        LATERAL FLATTEN(PARSE_JSON(TIMELINES)) t; "109        )110    except Exception as e:111        print(f"Failed persisting timlines to Snowflake: {e}")112        raise e113def get_avg_timeline(sw: SnowflakeWrapper, interval: Interval) -> pd.DataFrame:114    query = (115        f'SELECT '116        f'    STATUS                          "Status", '117        f'    COUNT(DISTINCT KEY)             "UniqueIssues", '118        f'    COUNT(*)                        "Issues", '119        f'    "Issues" - "UniqueIssues"       "Reassignments", '120        f'    AVG(TIMEDELTA) / (60 * 60 * 24) "AvgDays", '121        f'    MAX(TIMEDELTA) / (60 * 60 * 24) "MaxDays", '122        f'    MIN(TIMEDELTA) / (60 * 60 * 24) "MinDays" '123        f'FROM TIMELINES t '124        f'WHERE '125        f'    t.DATEFROM >= {interval.fromDate()} '126        f'    AND t.DATETO < {interval.toDate()} '127        f'GROUP BY 1 '128        f'ORDER BY 1, 4 DESC; '129    )130    print(query)131    return sw.fetch_df(query)132if __name__ == '__main__':133    with SnowflakeWrapper.create_snowflake_connection() as connection:134        sw = SnowflakeWrapper(connection)135        # timelines = build_issue_timelines(sw, Interval(date(2019, 7, 1), date(2020, 1, 1)))136        # print(timelines)137        # SnowflakeWrapper.execute_df_query(timelines, "timelines", ifexists='replace')...task.py
Source:task.py  
...74            raise Exception("failed to fetch tasks!")75        notes = response.json()['Data']['Notes']76        tasks.extend(notes)77        return tasks78def change_assignee(tasks, config, manager_id, driver):79    """80    docstring81    """82    print(f"Now @ change_assignee() using Id: {manager_id}")83    headers = {84        "Referer": "https://baaa.certification.systems/",85        "X-Requested-With": "XMLHttpRequest",86        "Accept": "application/json, text/javascript, */*; q=0.01",87        "Accept-Encoding": "gzip, deflate, br",88        "Accept-Language": "en-US,en;q=0.9",89        "Content-Type": "application/json; charset=UTF-8",90        "Host": "baaa.certification.systems",91        "Origin": "https://baaa.certification.systems",92        "Sec-Fetch-Dest": "empty",93        "Sec-Fetch-Mode": "cors",94        "Sec-Fetch-Site": "same-origin"95    }96    data = {"userid": manager_id}97    for task in tasks:98        url = f"https://baaa.certification.systems/Reminder/Note/{task['Id']}/Assignee/"99        print(f"Url: {url}")100        response = driver.request(101            'POST', url, data=json.dumps(data), headers=headers)102        print(f"Response status code: {response.status_code}")103        if not response.status_code == 200:104            print(f"Error updating assginee for task {task['Id']}")105            print(response.text)106            continue107        print(f"Successfully updated assginee for task {task['Id']}")108def read_managers():109    """110    docstring111    """112    with open("managers.json", 'rt') as fp:113        managers = json.loads(fp.read())114        print(f"Loaded ({len(managers)}) managers")115        return managers116def main():117    """118    docstring119    """120    parser = argparse.ArgumentParser()121    parser.add_argument("--input")122    parser.add_argument("--manager", default="Chris Franklin")123    options = parser.parse_args()124    config = read_config()125    managers = read_managers()126    if not config['manager'] in managers:127        print(f"Error! Can't lookup ID of Manager <{config['manager']}>")128        return129    manager_id = managers[config['manager']]130    driver = Chrome()131    driver.get(config['site'])132    login(config, driver)133    tasks = download_tasks(config, driver)134    if tasks and len(tasks) > 0:135        change_assignee(tasks, config, manager_id, driver)136if __name__ == "__main__":...parenting.py
Source:parenting.py  
1from collections import defaultdict2t = int(input())3def change_assignee(current):4    return "C" if current == "J" else "J"5def solve():6    num_tasks = int(input())7    intervals = []8    assignee = "C"9    for i in range(num_tasks):10        interval_str = input()11        interval = interval_str.split(" ")12        interval.append(i) # append order of the task13        intervals.append(interval)14    intervals.sort(key=lambda i: int(i[0]))15    lookup = defaultdict(list)16    ans = [""] * len(intervals)17    for j in intervals:18        if not lookup:19            lookup[assignee] = j20            ans[j[-1]] = assignee21        else:22            start = int(j[0])23            if start >= int(lookup[assignee][1]):24                lookup[assignee] = j25                ans[j[-1]] = assignee26            else:27                assignee = change_assignee(assignee)28                # check if collision with other person29                if not lookup[assignee] or start >= int(lookup[assignee][1]):30                    lookup[assignee] = j31                    ans[j[-1]] = assignee32                else:33                    return "IMPOSSIBLE"34    return "".join(ans)35for c in range(t):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
