Best Python code snippet using playwright-python
main.py
Source:main.py  
1"""."""2import os3import ast4import atexit5from functools import wraps6from datetime import datetime, timedelta7#from zoneinfo import ZoneInfo  # alternative to thirdparty pytz8# from apscheduler.schedulers.background import BackgroundScheduler9import flask10from flask import Flask, render_template, request11import firebase_admin12from firebase_admin import db13from firebase_admin import auth14from firebase_admin import credentials15import google.oauth2.id_token16from google.auth.transport import requests17from werkzeug.utils import redirect18from logger import GCPLogging19app = Flask(__name__)20CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "").strip()21CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "").strip()22certs = os.getenv('GOOGLE_APPLICATION_CREDENTIALS', '').strip()23#certs = certs1.strip()24parsed_json = ast.literal_eval(str(certs))25cred = credentials.Certificate(parsed_json)26firebase_admin.initialize_app(cred, {27    'databaseURL':28    'https://dailyquest-9d678-default-rtdb.firebaseio.com/'29})30app.secret_key = os.getenv("secret_key")31logme = GCPLogging(parsed_json)32# Disable them in production33if os.getenv("dev") == "local":34    os.environ["OAUTHLIB_RELAX_TOKEN_SCOPE"] = "1"35    os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"36firebase_request_adapter = requests.Request()37@app.before_request38def before_request():39    if 'DYNO' in os.environ:  # Only runs when on heroku40        logme.info(f"Headers: {request.headers=}")41        if request.url.startswith('http://') and request.headers.get(42                'X-Forwarded-Proto', "") == "http":43            url = request.url.replace('http://', 'https://', 1)44            code = 30145def tomorrow():46    next_date = datetime.today() + timedelta(days=1)47    return next_date.strftime("%Y/%m/%d")48def get_date():49    """Returns date time for url format in firebase"""50    return (datetime.now(ZoneInfo('Asia/Kolkata')) +51            timedelta(hours=6, minutes=30)).strftime("%Y/%m/%d")52def get_user_details():53    id_token = request.cookies.get("token")54    user_data = {}55    flg = False56    if id_token:57        try:58            user_data = google.oauth2.id_token.verify_firebase_token(59                id_token, firebase_request_adapter)60            flg = True61        except Exception as error:62            logme.error(f"Sorry, Error in getting user_data: {error}")63    return flg, user_data64def create_topics(topic):65    if not get_topic(topic):66        ref = db.reference("/Topic_List")67        ref.push({68            'name': topic69        })70def get_topics():71    ref = db.reference("/Topic_List")72    return {key: value['name'] for key, value in ref.get().items()}73def get_topic(topic):74    ref = db.reference("/Topic_List")75    topic_id = ""76    for key in ref.order_by_child('name').equal_to(topic).get():77        topic_id = key78    return topic_id79def create_subtopics(topic_id, subtopic):80    ref = db.reference(f"/Topic_List/{topic_id}/sub_topics")81    ref.push(subtopic)82def is_role(role):83    logme.info(f"{role} in is_role")84    return True if is_allowed(role) else False85def is_allowed(role):86    """."""87    flg, user_data = get_user_details()88    logme.info(f"in is_allowed: {flg=}\n{user_data=}")89    if not flg and not user_data:90        return False91    user_email = user_data.get('email', "")92    user_id = user_data.get('user_id', "")93    ref_url = f"/access_control_list/{role}"94    return (user_id, user_email) in db.reference(ref_url).get().items()95def need_role(role):96    def decorator(fun):97        @wraps(fun)98        def wrapper(*args, **kwargs):99            if is_allowed(role):100                return fun(*args, **kwargs)101            return flask.redirect('/')102        return wrapper103    return decorator104def auth_required(func):105    """."""106    def _inner_(*args):107        """."""108        user_data = ""109        try:110            id_token = request.cookies.get("token")111            logme.info(f"{id_token=}")112        except ValueError as ve:113            logme.error(f"{ve=}")114            id_token = None115        if id_token:116            try:117                while not user_data:118                    user_data = google.oauth2.id_token.verify_firebase_token(119                        id_token, firebase_request_adapter)120                result = func(user_data=user_data, *args)121                logme.info(f"{user_data=}")122                return result123            except Exception as error:124                logme.error(f"Failed in auth_required: {error=}")125                user_data = None126        logme.error(f"redirecting to logout as {id_token=} and {user_data=}")127        return redirect("/logout")128    # Renaming the function name:129    _inner_.__name__ = func.__name__130    return _inner_131# def get_topics():132#     logme.info("inside get_topics")133#     ref = db.reference("/QuestionBank")134#     logme.info("Lets get the topics from QuestionBank")135#     topics = tuple(ref.get('QuestionBank')[0].keys())136#     logme.info(f"Topics {topics=}")137#     return topics138def get_todays_quest(topic):139    logme.info("inside get_todays_quest")140    name = quest = None141    today = get_date()142    logme.info(f"Todays date: {today}")143    ref = db.reference(f"/Topics/{topic}/{today}")144    logme.info(f"2. Todays date: {today} - {ref=}")145    qid = ""146    if ref:147        logme.info("Lets start to find the qid from topics")148        logme.info(f"Lets get from  Topics/{topic}/{today} with {ref=}")149        qid = ref.get('qid')150        logme.info(f"{qid=} and {None not in qid}")151        if qid and None not in qid:152            qid = qid[0].get('qid') if ref else None153            logme.info(f"{qid=}..")154            url = f"/QuestionBank/{topic}/{qid}"155            logme.info(f"{url=}")156            ref = db.reference(url)157            if ref:158                logme.info(f"{ref=}")159                data = ref.get()160                logme.info(f"{data=}")161                if data and None not in data:162                    name = data.get("name", "")163                    quest = data.get("quest", "")164                logme.info(f">{name=}")165            logme.info(f"{name=},\n{quest=}\n{qid=}")166    logme.info(f"{qid=}, {name=}, {quest=}")167    return qid, name, quest168def get_latest_quest(topic_id):169    logme.info("inside get_todays_quest")170    name = quest = None171    # date = get_date()172    # logme.info(f"Todays date: {today}")173    past = 0174    while not (qid := get_value(f"/Topics/{topic_id}/{get_date(past)}", "qid", False)):175        past -= 1176    # Now lets find the quest details to display177    logme.info("Now lets find the quest details to display")178    url = f"/QuestionBank/{topic_id}/{qid}"179    name = get_value(url, "name", "")180    quest = get_value(url, "quest", "")181    return qid, name, quest, get_date(past)182    # ref = db.reference(f"/Topics/{topic}/{today}")183    # logme.info(f"2. Todays date: {today} - {ref=}")184    # qid = ""185    # if ref:186    #     logme.info("Lets start to find the qid from topics")187    #     logme.info(f"Lets get from  Topics/{topic}/{today} with {ref=}")188    #     qid = ref.get('qid')189    #     logme.info(f"{qid=} and {None not in qid}")190    #     if qid and None not in qid:191    #         qid = qid[0].get('qid') if ref else None192    #         logme.info(f"{qid=}..")193    #         url = f"/QuestionBank/{topic}/{qid}"194    #         logme.info(f"{url=}")195    #         ref = db.reference(url)196    #         if ref:197    #             logme.info(f"{ref=}")198    #             data = ref.get()199    #             logme.info(f"{data=}")200    #             if data and None not in data:201    #                 name = data.get("name", "")202    #                 quest = data.get("quest", "")203    #             logme.info(f">{name=}")204    #         logme.info(f"{name=},\n{quest=}\n{qid=}")205    # logme.info(f"{qid=}, {name=}, {quest=}")206    # return qid, name, quest207def get_solution(topic: str, qid: str) -> str:208    ref = db.reference(f"/QuestionBank/{topic}/{qid}")209    return ref.get() if ref else None210@app.route("/submit_solution", methods=['POST'])211@auth_required212def submit_solution(user_data=None):213    """."""214    logme.info(f"{user_data=}")215    proposed_solution = request.form['proposed_solution']216    topic, qid, quest_date = request.form['quest_date'].split("::")217    user_id = user_data.get("user_id", "")218    month, date = quest_date.rsplit("/", 1)219    url = f"/history/{topic}/{month}"220    ref = db.reference(url)221    ref.push({222        "date": date,223        "uid": user_id,224        "q_id": qid,225        "proposed": proposed_solution,226        "result": "Not Evaluated"  # Not Evaluated, Fail, Pass227    })228    return """<html><body>Thanks for Submitting the solution,229    click <a href='/'> me </a> to return back to home</body></html>"""230@app.route("/my_history", methods=["GET", "POST"])231@auth_required232def my_history(user_data=None):233    topics = get_topics()234    admin = is_role("admins")235    if request.method == 'POST':236        month = request.form.get("month")237        topic = request.form.get("topic")238        user_id = user_data.get("user_id", "")239        url = f"/history/{topic}/{month}"240        logme.info(f"{url=} -- {user_id=}")241        ref = db.reference(url)242        monthly_data = {}243        if ref.get():244            for key, value in ref.order_by_child(245                    'uid').equal_to(user_id).get().items():246                logme.info(f"{key=}\n{value=}")247                solution_data = get_solution(topic, value['q_id'])248                logme.info(f"{solution_data=}")249                value.update(solution_data)250                logme.info(f"updated {value=}")251                monthly_data[key] = value252        return render_template("history.html", admin=admin,253                               user_data=user_data,254                               data=monthly_data,255                               topics=topics)256    else:257        return render_template("history.html", user_data=user_data,258                               admin=admin, topics=topics, data={})259@app.route("/add_quest", methods=["GET", "POST"])260@need_role("admins")261@auth_required262def add_quest(user_data=None):263    admin = is_role("admins")264    if request.method == 'POST':265        topic = request.form.get("topic")266        name = request.form.get("name", "")267        quest = request.form.get("quest", "")268        solution = request.form.get("solution", "")269        logme.info(f"{topic=}\n{name=}\n{quest=}")270        ref = db.reference(f"/QuestionBank/{topic}")271        ref.push({'name': name.strip(), 'quest': quest.strip(), 'solution':272                  solution.strip(), "used": 0})273    topics = get_topics()274    return flask.render_template("add_quest.html", user_data=user_data,275                                 admin=admin, title="Enter New Quest",276                                 topics=topics)277@app.route("/todays_quest", methods=["GET", "POST"])278@auth_required279def todays_quest(user_data=None):280    admin = is_role("admins")281    logme.info(f"Reached todays_quest with {request.method=}")282    if request.method == 'POST':283        topic = request.form.get("topic")284        logme.info(f"today's {topic=}")285        # qid, name, quest = get_todays_quest(topic)286        qid, name, quest, date = get_latest_quest(topic)287        if all((qid,  name, quest)):288            logme.info(f"{qid=} -- {name=} -- {quest=}")289            return flask.render_template("todays_quest.html",290                                         title="Enter Daily Quest",291                                         quest_date=date, topic=topic,292                                         admin=admin, user_data=user_data,293                                         qid=qid, name=name, quest=quest)294        else:295            logme.info("Sorry no quest available")296            return "<html><body>Sorry, no new quest for today</body></html>"297    else:298        logme.info("user tried GET method for /todays_quest")299        return flask.redirect('/')300def get_value(url, key, default_value):301    ref = db.reference(f"/{url}")302    return data.get(key) if (data := ref.get()) else default_value303def get_cutoftime(topic_id):304    return get_value(f"/QuestDuration/{topic_id}", "cutoftime", 1750)305def get_repeat(topic_id):306    return get_value(f"/QuestDuration/{topic_id}", "repeat", 1)307# def log_me(msg, log_type):308#     # Emits the data using the standard logging module309#     logging.warning(msg)310def get_date(future_in=0):311    next_date = datetime.today() + timedelta(days=future_in)312    return next_date.strftime("%Y/%m/%d")313# def get_date():314#     """Returns date time for url format in firebase"""315#     return (datetime.now(ZoneInfo('Asia/Kolkata')) +316#             timedelta(hours=6, minutes=30)).strftime("%Y/%m/%d")317def to_update(topic_id):318    """319    """320    latest = get_value(f"/QuestDuration/{topic_id}", "latest", 1)321    repeat = get_value(f"/QuestDuration/{topic_id}", "repeat", 1)322    today = get_date()323    next_date = get_date(repeat)324    logme.info(f"{today=}  {next_date=}")325    # if next_date is already present then dont do anywhing326    val = get_value(f"/Topics/{topic_id}/{next_date}", "qid", False)327    logme.info(f"to do or not to do: {val=}")328    return next_date if not val else False329def get_fresh_quest(topic_id):330    """331    """332    key = ""333    ref = db.reference(f"/QuestionBank/{topic_id}")334    data = ref.order_by_child("used").equal_to(0).limit_to_first(1).get()335    logme.info(f"got the next free quest,{data=}")336    if data:337        key = list(data.keys())[0]338        #     logme.info(f"Adding {key=}")339        #     next_date = tomorrow()340        #     logme.info(f"{next_date=}")341        #     quest_ref = db.reference(f"/Topics/{topic}/{next_date}")342        #     quest_ref.set({343        #         "qid": key344        #     })345        # Now lets update the questionbank # Code working346        child = ref.child(key)347        child.update({348            "used": 1349        })350    return key351def use_quest(quest_id, topic_id, next_date):352    try:353        quest_ref = db.reference(f"/Topics/{topic_id}/{next_date}")354        quest_ref.set({355            "qid": quest_id356        })357        # lets update the latest358        ref = db.reference(f"/QuestDuration/{topic_id}")359        ref.update({360            "latest": next_date361        })362        return True363    except Exception as e:364        logme.error(e)365        return False366def populate_quests():367    """368    Step 1: Get the next date for the quest based on `latest` and `repeat`369    Step 2. Do we need to populate next quest and skip if not370    3. if yes, then obtain the first unused quest for topic371    4. populate the quest with previous questdetails372    5. Update the quest as used.373    """374    logme.info("Lets start populating the quests for next valid date")375    # Lets find all the quests we have376    for topic_id in get_topics():377        logme.info(f"\nChecking {topic_id=} if it need updating")378        if(next_date := to_update(topic_id)):379            """380            Lets update the quest381            """382            logme.info(383                f"lets updat the topic with quest for date: {next_date}")384            if quest_id := get_fresh_quest(topic_id):385                logme.info(f"Found the quest: {quest_id=}")386                use_quest(quest_id, topic_id, next_date)387# def get_next_free_quest():388#     """Use Push to add the questions else this will fail :("""389#     topics = get_topics()390#     logme.info(f"In get_next_free_quest: {topics}")391#     for topic in topics:392#         ref = db.reference(f"/QuestionBank/{topic}")393#         data = ref.order_by_child("used").equal_to(0).limit_to_first(1).get()394#         logme.info(f"{data=}")395#         if data:396#             key = list(data.keys())[0]397#             logme.info(f"Adding {key=}")398#             next_date = tomorrow()399#             logme.info(f"{next_date=}")400#             quest_ref = db.reference(f"/Topics/{topic}/{next_date}")401#             quest_ref.set({402#                 "qid": key403#             })404#             # Now lets update the questionbank405#             child = ref.child(key)406#             child.update({407#                 "used": 1408#             })409def schedule_quest():410    logme.info("Starting the logging")411    scheduler = BackgroundScheduler(timezone='Asia/Kolkata', daemon=True)412    scheduler.add_job(populate_quests,  'cron',413                      hour='09', minute='30', second="00")414    scheduler.start()415    # Shut down the scheduler when exiting the app416    atexit.register(lambda: scheduler.shutdown())417@app.route("/evaluate", methods=['POST'])418@need_role("admins")419@auth_required420def evaluation(user_data=None):421    #    topic = request.form.get("topic")422    month = request.form.get("selected_month")423    topic = request.form.get("selected_topic")424    logme.info(f"{month=}  {topic=}")425    logme.info(f"{request.form=}")426    url = f"/history/{topic}/{month}"427    for key in request.form:428        if key.startswith("-"):429            _url = f"{url}/{key}"430            child = db.reference(_url)431            logme.info(f"Updating child: {key} -> {request.form.get(key)=}")432            child.update({433                "result": request.form.get(key)434            })435    return flask.redirect('/eval')436@app.route("/eval", methods=['GET', 'POST'])437@need_role("admins")438@auth_required439def eval_quest(user_data=None):440    topics = get_topics()441    admin = is_role("admins")442    if request.method == 'POST':443        month = request.form.get("month")444        topic = request.form.get("topic")445        # user_id = user_data.get("user_id", "")446        url = f"/history/{topic}/{month}"447        # logme.info(f"{url=} -- {user_id=}")448        ref = db.reference(url)449        monthly_data = {}450        if ref.get():451            for key, value in ref.order_by_child('date').get().items():452                logme.info(f"{key=}\n{value=}")453                solution_data = get_solution(topic, value['q_id'])454                logme.info(f"{solution_data=}")455                value.update(solution_data)456                logme.info(f"updated {value=}")457                monthly_data[key] = value458        return render_template("evaluate.html",459                               user_data=user_data, month=month, topic=topic,460                               data=monthly_data, admin=admin,461                               topics=topics)462    else:463        return render_template("evaluate.html", user_data=user_data,464                               admin=admin, topics=topics, data={})465@app.route("/logout")466def logout():467    """."""468    logme.info(f"{flask.request.cookies=}")469    id_token = flask.request.cookies.get('token')470    try:471        if id_token:472            logme.info(f"{id_token=}")473            decoded_claims = google.oauth2.id_token.verify_firebase_token(474                id_token, firebase_request_adapter)475            # decoded_claims = auth.verify_session_cookie(session_cookie)476            auth.revoke_refresh_tokens(decoded_claims['sub'])477            response = flask.make_response(flask.redirect('/login'))478            response.set_cookie('token', "", expires=-1)479            return response480    except auth.InvalidSessionCookieError as error:481        logme.error(f"Error: {error}")482    except ValueError as ve:483        logme.error(f"Value Error {ve=}")484        response = flask.make_response(flask.redirect('/login'))485        response.set_cookie('token', "", expires=-1)486        return response487    return flask.redirect('/login')488@app.route("/", methods=["GET"])489@auth_required490def index(user_data=None):491    admin = is_role("admins")492    logme.info(f"1. {admin=}")493    if user_data:494        logme.info(">> Getting topics")495        topics = get_topics()496        logme.info(f"1. user: {user_data=}\n{topics=}")497        return flask.render_template("index.html", title="Daily Quest",498                                     admin=admin, user_data=user_data,499                                     topics=topics)500    return redirect("/logout")501@app.route("/login")502def login():503    # if  flask.request.cookies.get('token'):504    #     return redirect("/logout")505    return flask.render_template("login.html", title="Please Login")506def get_past_quests(topic, month):507    url = f"/history/{topic}/{month}"508    # Lets get details of every user for month of509    topic_ref = db.reference(url)510    solution = topic_ref.order_by_child("date").get()511    return solution512def main():513    logme.info("Inside main")514    # schedule_quest()515    if __name__ == '__main__':516        port = int(os.getenv('PORT', "5000"))517        app.run(host='0.0.0.0', port=port, debug=False)...run.py
Source:run.py  
1import sys, os, time2from asyncio import get_event_loop, TimeoutError, ensure_future, new_event_loop, set_event_loop3from . import datelock, feed, get, output, verbose, storage4from .storage import db5import logging as logme6class Twint:7    def __init__(self, config):8        logme.debug(__name__+':Twint:__init__')9        if config.Resume is not None and (config.TwitterSearch or config.Followers or config.Following):10            logme.debug(__name__+':Twint:__init__:Resume')11            self.init = self.get_resume(config.Resume)12        else:13            self.init = '-1'14        self.feed = [-1]15        self.count = 016        self.user_agent = ""17        self.config = config18        self.conn = db.Conn(config.Database)19        self.d = datelock.Set(self.config.Until, self.config.Since)20        verbose.Elastic(config.Elasticsearch)21        if self.config.Store_object:22            logme.debug(__name__+':Twint:__init__:clean_follow_list')23            output._clean_follow_list()24        if self.config.Pandas_clean:25            logme.debug(__name__+':Twint:__init__:pandas_clean')26            storage.panda.clean()27    def get_resume(self, resumeFile):28        if not os.path.exists(resumeFile):29            return '-1'30        with open(resumeFile, 'r') as rFile:31            _init = rFile.readlines()[-1].strip('\n')32            return _init33    async def Feed(self):34        logme.debug(__name__+':Twint:Feed')35        consecutive_errors_count = 036        while True:37            response = await get.RequestUrl(self.config, self.init, headers=[("User-Agent", self.user_agent)])38            if self.config.Debug:39                print(response, file=open("twint-last-request.log", "w", encoding="utf-8"))40            self.feed = []41            try:42                if self.config.Favorites:43                    self.feed, self.init = feed.Mobile(response)44                    if not self.count % 40:45                        time.sleep(5)46                elif self.config.Followers or self.config.Following:47                    self.feed, self.init = feed.Follow(response)48                    if not self.count % 40:49                        time.sleep(5)50                elif self.config.Profile:51                    if self.config.Profile_full:52                        self.feed, self.init = feed.Mobile(response)53                    else:54                        self.feed, self.init = feed.profile(response)55                elif self.config.TwitterSearch:56                    self.feed, self.init = feed.Json(response)57                break58            except TimeoutError as e:59                if self.config.Proxy_host.lower() == "tor":60                    print("[?] Timed out, changing Tor identity...")61                    if self.config.Tor_control_password is None:62                        logme.critical(__name__+':Twint:Feed:tor-password')63                        sys.stderr.write("Error: config.Tor_control_password must be set for proxy autorotation!\r\n")64                        sys.stderr.write("Info: What is it? See https://stem.torproject.org/faq.html#can-i-interact-with-tors-controller-interface-directly\r\n")65                        break66                    else:67                        get.ForceNewTorIdentity(self.config)68                        continue69                else:70                    logme.critical(__name__+':Twint:Feed:' + str(e))71                    print(str(e))72                    break73            except Exception as e:74                if self.config.Profile or self.config.Favorites:75                    print("[!] Twitter does not return more data, scrape stops here.")76                    break77                logme.critical(__name__+':Twint:Feed:noData' + str(e))78                # Sometimes Twitter says there is no data. But it's a lie.79                consecutive_errors_count += 180                if consecutive_errors_count < self.config.Retries_count:81                    # skip to the next iteration if wait time does not satisfy limit constraints82                    delay = round(consecutive_errors_count ** self.config.Backoff_exponent, 1)83                    # if the delay is less than users set min wait time then replace delay84                    if self.config.Min_wait_time > delay:85                        delay = self.config.Min_wait_time86                    sys.stderr.write('sleeping for {} secs\n'.format(delay))87                    time.sleep(delay)88                    self.user_agent = await get.RandomUserAgent(wa=True)89                    continue90                logme.critical(__name__+':Twint:Feed:Tweets_known_error:' + str(e))91                sys.stderr.write(str(e) + " [x] run.Feed")92                sys.stderr.write("[!] if get this error but you know for sure that more tweets exist, please open an issue and we will investigate it!")93                break94        if self.config.Resume:95            print(self.init, file=open(self.config.Resume, "a", encoding="utf-8"))96    async def follow(self):97        await self.Feed()98        if self.config.User_full:99            logme.debug(__name__+':Twint:follow:userFull')100            self.count += await get.Multi(self.feed, self.config, self.conn)101        else:102            logme.debug(__name__+':Twint:follow:notUserFull')103            for user in self.feed:104                self.count += 1105                username = user.find("a")["name"]106                await output.Username(username, self.config, self.conn)107    async def favorite(self):108        logme.debug(__name__+':Twint:favorite')109        await self.Feed()110        self.count += await get.Multi(self.feed, self.config, self.conn)111    async def profile(self):112        await self.Feed()113        if self.config.Profile_full:114            logme.debug(__name__+':Twint:profileFull')115            self.count += await get.Multi(self.feed, self.config, self.conn)116        else:117            logme.debug(__name__+':Twint:notProfileFull')118            for tweet in self.feed:119                self.count += 1120                await output.Tweets(tweet, self.config, self.conn)121    async def tweets(self):122        await self.Feed()123        if self.config.Location:124            logme.debug(__name__+':Twint:tweets:location')125            self.count += await get.Multi(self.feed, self.config, self.conn)126        else:127            logme.debug(__name__+':Twint:tweets:notLocation')128            for tweet in self.feed:129                self.count += 1130                await output.Tweets(tweet, self.config, self.conn)131    async def main(self, callback=None):132        task = ensure_future(self.run())  # Might be changed to create_task in 3.7+.133        if callback:134            task.add_done_callback(callback)135        await task136    async def run(self):137        if self.config.TwitterSearch:138            self.user_agent = await get.RandomUserAgent(wa=True)139        else:140            self.user_agent = await get.RandomUserAgent()141        if self.config.User_id is not None:142            logme.debug(__name__+':Twint:main:user_id')143            self.config.Username = await get.Username(self.config.User_id)144        if self.config.Username is not None:145            logme.debug(__name__+':Twint:main:username')146            url = f"https://twitter.com/{self.config.Username}?lang=en"147            self.config.User_id = await get.User(url, self.config, self.conn, True)148        if self.config.TwitterSearch and self.config.Since and self.config.Until:149            logme.debug(__name__+':Twint:main:search+since+until')150            while self.d._since < self.d._until:151                self.config.Since = str(self.d._since)152                self.config.Until = str(self.d._until)153                if len(self.feed) > 0:154                    await self.tweets()155                else:156                    logme.debug(__name__+':Twint:main:gettingNewTweets')157                    break158                if get.Limit(self.config.Limit, self.count):159                    break160        else:161            logme.debug(__name__+':Twint:main:not-search+since+until')162            while True:163                if len(self.feed) > 0:164                    if self.config.Followers or self.config.Following:165                        logme.debug(__name__+':Twint:main:follow')166                        await self.follow()167                    elif self.config.Favorites:168                        logme.debug(__name__+':Twint:main:favorites')169                        await self.favorite()170                    elif self.config.Profile:171                        logme.debug(__name__+':Twint:main:profile')172                        await self.profile()173                    elif self.config.TwitterSearch:174                        logme.debug(__name__+':Twint:main:twitter-search')175                        await self.tweets()176                else:177                    logme.debug(__name__+':Twint:main:no-more-tweets')178                    break179                #logging.info("[<] " + str(datetime.now()) + ':: run+Twint+main+CallingGetLimit2')180                if get.Limit(self.config.Limit, self.count):181                    logme.debug(__name__+':Twint:main:reachedLimit')182                    break183        if self.config.Count:184            verbose.Count(self.count, self.config)185def run(config, callback=None):186    logme.debug(__name__+':run')187    try:188        get_event_loop()189    except RuntimeError as e:190        if "no current event loop" in str(e):191            set_event_loop(new_event_loop())192        else:193            logme.exception(__name__+':Lookup:Unexpected exception while handling an expected RuntimeError.')194            raise195    except Exception as e:196        logme.exception(__name__+':Lookup:Unexpected exception occured while attempting to get or create a new event loop.')197        raise198    get_event_loop().run_until_complete(Twint(config).main(callback))199def Favorites(config):200    logme.debug(__name__+':Favorites')201    config.Favorites = True202    config.Following = False203    config.Followers = False204    config.Profile = False205    config.Profile_full = False206    config.TwitterSearch = False207    run(config)208    if config.Pandas_au:209        storage.panda._autoget("tweet")210def Followers(config):211    logme.debug(__name__+':Followers')212    config.Followers = True213    config.Following = False214    config.Profile = False215    config.Profile_full = False216    config.Favorites = False217    config.TwitterSearch = False218    run(config)219    if config.Pandas_au:220        storage.panda._autoget("followers")221        if config.User_full:222            storage.panda._autoget("user")223    if config.Pandas_clean and not config.Store_object:224        #storage.panda.clean()225        output._clean_follow_list()226def Following(config):227    logme.debug(__name__+':Following')228    config.Following = True229    config.Followers = False230    config.Profile = False231    config.Profile_full = False232    config.Favorites = False233    config.TwitterSearch = False234    run(config)235    if config.Pandas_au:236        storage.panda._autoget("following")237        if config.User_full:238            storage.panda._autoget("user")239    if config.Pandas_clean and not config.Store_object:240        #storage.panda.clean()241        output._clean_follow_list()242def Lookup(config):243    logme.debug(__name__+':Lookup')244    try:245        get_event_loop()246    except RuntimeError as e:247        if "no current event loop" in str(e):248            set_event_loop(new_event_loop())249        else:250            logme.exception(__name__+':Lookup:Unexpected exception while handling an expected RuntimeError.')251            raise252    except Exception as e:253        logme.exception(__name__+':Lookup:Unexpected exception occured while attempting to get or create a new event loop.')254        raise255    try:256        if config.User_id is not None:257            logme.debug(__name__+':Twint:Lookup:user_id')258            config.Username = get_event_loop().run_until_complete(get.Username(config.User_id))259        url = f"https://twitter.com/{config.Username}?lang=en"260        get_event_loop().run_until_complete(get.User(url, config, db.Conn(config.Database)))261        if config.Pandas_au:262            storage.panda._autoget("user")263    except RuntimeError as e:264        if "no current event loop" in str(e):265            logme.exception(__name__+':Lookup:Previous attempt to to create an event loop failed.')266        raise267    except Exception as e:268        logme.exception(__name__+':Lookup:Unexpected exception occured.')269        raise270def Profile(config):271    logme.debug(__name__+':Profile')272    config.Profile = True273    config.Favorites = False274    config.Following = False275    config.Followers = False276    config.TwitterSearch = False277    run(config)278    if config.Pandas_au:279        storage.panda._autoget("tweet")280def Search(config, callback=None):281    logme.debug(__name__+':Search')282    config.TwitterSearch = True283    config.Favorites = False284    config.Following = False285    config.Followers = False286    config.Profile = False287    config.Profile_full = False288    run(config, callback)289    if config.Pandas_au:...create_snapshots_by_instance_and_purge_old_backups.py
Source:create_snapshots_by_instance_and_purge_old_backups.py  
...17	os.makedirs(output_dir + '/csv/')18if not os.path.exists(home + '/log/'):19	os.makedirs(home + '/log/')20logfile_name = home + '/log/' + 'scheduled_backup_run.' + datestamp + '.' + timestamp + '.log';21def logme(string_to_log,logfile_name):22	if (logToConsole=True):23		print(string_to_log)24	logfile_object = open(logfile_name,'a',0)25	logfile_object.write(string_to_log + "\n")26	logfile_object.close()27	28def get_snapshots_to_remove(myEC2Class,default_retention_value,specified_instances=[]):29	snaps_to_remove=[]30	specified_instance_ids=[]31	if len(specified_instances) == 0:32		specified_instances = myEC2Class.instance_details33	for record in myEC2Class.volume_details:34		in_backup_policy=False35		volid,volobj,volblkd,insid,insobj,num_of_total_snaps=record36		number_of_backup_snaps=037		for sid,sno,vid,vo,iid,io,sd,st,ubami,amiid,svex,ioex,amex,sndesc in myEC2Class.snapshot_details:38	        	if vid == volid:39				if 'Backup_Type' in sno.__dict__['tags']:40					if sno.__dict__['tags']['Backup_Type'] == 'Scheduled':41                               			number_of_backup_snaps += 142		for specified_instance in specified_instances:43			i_id = specified_instance[0]44			i_obj = specified_instance[2]45			i_name = specified_instance[1]46			retention_value = default_retention_value47			if i_id == insid:48				logme("\n" + volid +" on "+ i_name + " (" + i_id + ") : Checking if Scheduled Backup Policy is to be applied..",logfile_name)49				if 'Apply_Scheduled_Backup_Policy' in i_obj.__dict__['tags']:50					logme("\tApply_Scheduled_Backup_Policy tag found",logfile_name)51					if i_obj.__dict__['tags']['Apply_Scheduled_Backup_Policy'].lower() == "yes":52						logme("\tScheduled Backup Policy is applied to " + i_name +" (" + i_id + ")",logfile_name)53						if 'Backup_Retention' in i_obj.__dict__['tags']:54							logme("\tBackup retention value found",logfile_name)55							retention_value = int(i_obj.__dict__['tags']['Backup_Retention'])56							logme("\tRetention value is now :" + str(retention_value),logfile_name)57						logme("\tTotal snapshots of volume : " + str(num_of_total_snaps),logfile_name)			58						logme("\tTotal snapshots created by scheduled backup policy : " + str(number_of_backup_snaps),logfile_name)			59						if number_of_backup_snaps > retention_value:60							num_of_snaps_to_delete = number_of_backup_snaps - retention_value61							logme("\tNumber of backup snapshots to delete : " + str(num_of_snaps_to_delete),logfile_name)62							oldest_snapshots=myEC2Class.return_oldest_snapshots_to_delete_for_vol(volid,num_of_snaps_to_delete)63							for vid,snapid,snapdate,snaptime in oldest_snapshots:64								snaps_to_remove.append([insid,vid,snapid,snapdate,snaptime,number_of_backup_snaps,retention_value])65						else:66							logme("\tNumber of snapshots to delete : 0",logfile_name)				67	return snaps_to_remove	68def get_backup_volumes(myEC2Class,specified_instances=[]):69	snaps_created=[]70	specified_instance_ids=[]71	if len(specified_instances) == 0:72		specified_instances = myEC2Class.instance_details73	for record in myEC2Class.volume_details:74		volid,volobj,volblkd,insid,insobj,num_of_snaps=record75		for specified_instance in specified_instances:76			i_id = specified_instance[0]77			i_name = specified_instance[1]78			i_obj = specified_instance[2]79			if i_id == insid:80				if 'Backup_Retention' in i_obj.__dict__['tags']:81					retention_value = int(i_obj.__dict__['tags']['Backup_Retention'])82				if 'Apply_Scheduled_Backup_Policy' in i_obj.__dict__['tags']:83					if i_obj.__dict__['tags']['Apply_Scheduled_Backup_Policy'].lower() == "yes":84						timestamp=strftime("%d-%m-%Y %H:%M:%S", gmtime())85						created_snapshot=volobj.create_snapshot()86						created_snapshot.add_tags({'Name': 'Scheduled Backup of ' + i_name + ' (' + i_id + ') ' + timestamp , 'Instance_ID' : i_id, 'Volume_ID'  : volid,'Backup_Type' : 'Scheduled' })						87						snapid=created_snapshot.id88						snapdate,snaptime=str(created_snapshot.start_time)[:19].split('T')89						snaps_created.append([i_name,insid,volid,snapid,snapdate,snaptime])90	return snaps_created91def delete_snapshots(myEC2Class,snaps_to_remove):92	deleted_snapshots=[]93	for snap_to_remove in snaps_to_remove:94		for snapshots in myEC2Class.snapshot_details:95			# check ids match up - we do this to find out the object for the snapshot being deleted			96			if snap_to_remove[2] == snapshots[0]:97				logme("Deleting " + snapshots[0] + " of volume " + snapshots[2] + " (" + str(snapshots[5].__dict__['tags']['Name']) + ")...",logfile_name)				98				snapshots[1].delete()99				deleted_snapshots.append(snapshots)100	return deleted_snapshots101def get_EC2_totals_for_backup_policy_usage(myEC2Class):102	total_snapshots_created_other=0103	total_snapshots_created_by_backup_policy=0104	total_instances_backup_policy_off=0105	total_instances_backup_policy_on=0106	total_instances_backup_policy_unset=0107	for snaps_detail in myEC2Class.snapshot_details:108		sna_obj=snaps_detail[1]109		if 'Backup_Type' in sna_obj.__dict__['tags']:110			if sna_obj.__dict__['tags']['Backup_Type'] == "Scheduled":111				total_snapshots_created_by_backup_policy += 1112			else:113				total_snapshots_created_other += 1114		else:115			total_snapshots_created_other += 1116	for instance_detail in myEC2Class.instance_details:117		i_obj = instance_detail[2]118		if 'Apply_Scheduled_Backup_Policy' in i_obj.__dict__['tags']:119			if i_obj.__dict__['tags']['Apply_Scheduled_Backup_Policy'].lower() == "yes":120				total_instances_backup_policy_on += 1121			else:122				total_instances_backup_policy_off += 1123		else:124			total_instances_backup_policy_unset += 1125		126	return [total_snapshots_created_by_backup_policy,total_snapshots_created_other,total_instances_backup_policy_on,total_instances_backup_policy_off,total_instances_backup_policy_unset]127def main():128	global myEC2Utils129	region,awskeyid,awsseckey,awsaccountid=get_config_info(home + '/.aws_config/awsconfig.txt.ctrust_acc')130	myEC2Class = EC2Utils(region,awskeyid,awsseckey,awsaccountid)131	retention_value=2132	totals_array=[]133	if not os.path.exists(output_dir + "Snapshots_Created_By_Scheduled_Backup/csv"):134		os.makedirs(output_dir + "Snapshots_Created_By_Scheduled_Backup/csv")135	if not os.path.exists(output_dir + "Snapshots_Created_By_Scheduled_Backup/xlsx"):136		os.makedirs(output_dir + "Snapshots_Created_By_Scheduled_Backup/xlsx")137	if not os.path.exists(output_dir + "Totals_for_scheduled_backup_run/csv"):138		os.makedirs(output_dir + "Totals_for_scheduled_backup_run/csv")139	if not os.path.exists(output_dir + "Totals_for_scheduled_backup_run/xlsx"):140		os.makedirs(output_dir + "Totals_for_scheduled_backup_run/xlsx")141	logme("-------------------------------------------------------------",logfile_name)142	logme("Scheduled Backup Run : " + datetime.datetime.now().strftime("%A %D @ %T"),logfile_name)143	logme("-------------------------------------------------------------",logfile_name)144	logme("Total snapshots existing pre backup process : " + str(len(myEC2Class.snapshot_details)),logfile_name)145	totals_array.append(["Total Snapshots Pre Backup Process : ",len(myEC2Class.snapshot_details)])146	logme("Starting Scheduled Backup Process.....",logfile_name)147	created_snaps=get_backup_volumes(myEC2Class)148	logme("Total new snapshots creared by backup process : " + str(len(created_snaps)),logfile_name)149	totals_array.append(["Total new snaps created by backup process : ", len(created_snaps)])150	header=['Instance Id','Instance Name','Volume ID','Snapshot ID','Snapshot Date','Snapshot Time']151	outputfile = output_dir + 'Snapshots_Created_By_Scheduled_Backup/' + 'xlsx/' + datestamp + '_' + timestamp + '_Snapshots_Created_By_Scheduled_Backup.xlsx'152	logme("\nCreating Excel report containing details about created snapshots",logfile_name)153	myEC2Class.generate_report_from_array(outputfile,header,created_snaps,"Snapshots_Created")154	outputfile = output_dir + 'Snapshots_Created_By_Scheduled_Backup/' + 'csv/' + datestamp + '_' + timestamp + '_Snapshots_Created_By_Scheduled_Backup.csv'155	myEC2Class.generate_report_from_array(outputfile,header,created_snaps,"Snapshots_Created")156	logme("\nRefreshing Snapshot Information to get latest backup information",logfile_name)157	myEC2Class.update()158	logme("\nStarting Process to Delete old backups....",logfile_name)159	total_pre_deletion=len(myEC2Class.snapshot_details)160	totals_array.append(["Total Pre Deletion",total_pre_deletion])161	logme("Total of all snapshots (pre deletion): " + str(total_pre_deletion),logfile_name)162	snaps_to_remove=get_snapshots_to_remove(myEC2Class,retention_value)163	logme("\nTotal number of snapshots to be deleted : " + str(len(snaps_to_remove)),logfile_name)164	totals_array.append(["Total Snapshots to be Deleted",len(snaps_to_remove)])165	logme("\nDeleting old backups....",logfile_name)		166	deleted_snapshots=delete_snapshots(myEC2Class,snaps_to_remove)167	logme("\nRefreshing Snapshot Information after deletion..",logfile_name)168	myEC2Class.update()169	total_post_deletion=len(myEC2Class.snapshot_details)170	total_deleted=total_pre_deletion-total_post_deletion171	logme("\nTotal Snapshots successfully deleted : " + str(total_deleted),logfile_name)172	totals_array.append(["Total snapshot Successfully Deleted",total_deleted])173	logme("Total of all snapshots (post deletion): " + str(total_post_deletion),logfile_name)174	totals_array.append(["Total of all snapshots (post deletion)",total_post_deletion])175	tscb,tsco,tibe,tibo,tibu=get_EC2_totals_for_backup_policy_usage(myEC2Class)176	logme("Total Snapshots in EC2 created by backup policy : " + str(tscb),logfile_name)177	totals_array.append(["Total Snapshots in EC2 created using backup policy",tscb])178	logme("Total Snapshots in EC2 not created by backup policy : " + str(tsco),logfile_name)179	totals_array.append(["Total Snapshots in EC2 not created using backup policy",tsco])180	logme("Total Instances in EC2 with backup policy applied : " + str(tibe),logfile_name)181	totals_array.append(["Total Instances in EC2 with backup policy applied",tibe])182	logme("Total Instances in EC2 with backup policy turned off : " + str(tibo),logfile_name)183	totals_array.append(["Total Instances in EC2 with backup policy turned off",tibo])184	logme("Total Instances in EC2 with backup policy unset : " + str(tibu),logfile_name)185	totals_array.append(["Total Instances in EC2 with backup policy unset",tibu])186	header=['Total Type','Total']187	logme("\nCreating Totals report file",logfile_name)188	outputfile = output_dir + 'Totals_for_scheduled_backup_run/' + 'xlsx/' + datestamp + '_' + timestamp + '_Totals_for_scheduled_backup_run.xlsx'189	myEC2Class.generate_report_from_array(outputfile,header,totals_array,"Backup Policy Totals")190	outputfile = output_dir + 'Totals_for_scheduled_backup_run/' + 'csv/' + datestamp + '_' + timestamp + '_Totals_for_scheduled_backup_run.csv'191	myEC2Class.generate_report_from_array(outputfile,header,totals_array,"Backup Policy Totals")192	logme("",logfile_name)193	logme("-------------------------------------------------------------",logfile_name)194	logme("Finished Backup Run : " + datetime.datetime.now().strftime("%A %D @ %T"),logfile_name)195	logme("-------------------------------------------------------------",logfile_name)196if __name__ == "__main__":...get.py
Source:get.py  
1from async_timeout import timeout2from datetime import datetime3from bs4 import BeautifulSoup4import sys5import socket6import aiohttp7from fake_useragent import UserAgent8import asyncio9import concurrent.futures10import random11from json import loads12from aiohttp_socks import SocksConnector, SocksVer13from . import url14from .output import Tweets, Users15from .user import inf16import logging as logme17httpproxy = None18user_agent_list = [19    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36',20    'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',21    'Mozilla/5.0 (Windows NT 5.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',22    'Mozilla/5.0 (Windows NT 6.2; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36',23    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.157 Safari/537.36',24    'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36',25    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36',26    'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36',27    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',28    'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36',29    'Mozilla/4.0 (compatible; MSIE 9.0; Windows NT 6.1)',30    'Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko',31    'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)',32    'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko',33    'Mozilla/5.0 (Windows NT 6.2; WOW64; Trident/7.0; rv:11.0) like Gecko',34    'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko',35    'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.0; Trident/5.0)',36    'Mozilla/5.0 (Windows NT 6.3; WOW64; Trident/7.0; rv:11.0) like Gecko',37    'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)',38    'Mozilla/5.0 (Windows NT 6.1; Win64; x64; Trident/7.0; rv:11.0) like Gecko',39    'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)',40    'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; Trident/6.0)',41    'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729)'42]43def get_connector(config):44    logme.debug(__name__+':get_connector')45    _connector = None46    if config.Proxy_host:47        if config.Proxy_host.lower() == "tor":48            _connector = SocksConnector(49                socks_ver=SocksVer.SOCKS5,50                host='127.0.0.1',51                port=9050,52                rdns=True)53        elif config.Proxy_port and config.Proxy_type:54            if config.Proxy_type.lower() == "socks5":55                _type = SocksVer.SOCKS556            elif config.Proxy_type.lower() == "socks4":57                _type = SocksVer.SOCKS458            elif config.Proxy_type.lower() == "http":59                global httpproxy60                httpproxy = "http://" + config.Proxy_host + ":" + str(config.Proxy_port)61                return _connector62            else:63                logme.critical("get_connector:proxy-type-error")64                print("Error: Proxy types allowed are: http, socks5 and socks4. No https.")65                sys.exit(1)66            _connector = SocksConnector(67                socks_ver=_type,68                host=config.Proxy_host,69                port=config.Proxy_port,70                rdns=True)71        else:72            logme.critical(__name__+':get_connector:proxy-port-type-error')73            print("Error: Please specify --proxy-host, --proxy-port, and --proxy-type")74            sys.exit(1)75    else:76        if config.Proxy_port or config.Proxy_type:77            logme.critical(__name__+':get_connector:proxy-host-arg-error')78            print("Error: Please specify --proxy-host, --proxy-port, and --proxy-type")79            sys.exit(1)80    return _connector81async def RequestUrl(config, init, headers = []):82    logme.debug(__name__+':RequestUrl')83    _connector = get_connector(config)84    _serialQuery = ""85    params = []86    _url = ""87    if config.Profile:88        if config.Profile_full:89            logme.debug(__name__+':RequestUrl:Profile_full')90            _url = await url.MobileProfile(config.Username, init)91        else:92            logme.debug(__name__+':RequestUrl:notProfile_full')93            _url = await url.Profile(config.Username, init)94        _serialQuery = _url95    elif config.TwitterSearch:96        logme.debug(__name__+':RequestUrl:TwitterSearch')97        _url, params, _serialQuery = await url.Search(config, init)98    else:99        if config.Following:100            logme.debug(__name__+':RequestUrl:Following')101            _url = await url.Following(config.Username, init)102        elif config.Followers:103            logme.debug(__name__+':RequestUrl:Followers')104            _url = await url.Followers(config.Username, init)105        else:106            logme.debug(__name__+':RequestUrl:Favorites')107            _url = await url.Favorites(config.Username, init)108        _serialQuery = _url109    response = await Request(_url, params=params, connector=_connector, headers=headers)110    if config.Debug:111        print(_serialQuery, file=open("twint-request_urls.log", "a", encoding="utf-8"))112    return response113def ForceNewTorIdentity(config):114    logme.debug(__name__+':ForceNewTorIdentity')115    try:116        tor_c = socket.create_connection(('127.0.0.1', config.Tor_control_port))117        tor_c.send('AUTHENTICATE "{}"\r\nSIGNAL NEWNYM\r\n'.format(config.Tor_control_password).encode())118        response = tor_c.recv(1024)119        if response != b'250 OK\r\n250 OK\r\n':120            sys.stderr.write('Unexpected response from Tor control port: {}\n'.format(response))121            logme.critical(__name__+':ForceNewTorIdentity:unexpectedResponse')122    except Exception as e:123        logme.debug(__name__+':ForceNewTorIdentity:errorConnectingTor')124        sys.stderr.write('Error connecting to Tor control port: {}\n'.format(repr(e)))125        sys.stderr.write('If you want to rotate Tor ports automatically - enable Tor control port\n')126async def Request(url, connector=None, params=[], headers=[]):127    logme.debug(__name__+':Request:Connector')128    async with aiohttp.ClientSession(connector=connector, headers=headers) as session:129        return await Response(session, url, params)130async def Response(session, url, params=[]):131    logme.debug(__name__+':Response')132    with timeout(120):133        async with session.get(url, ssl=True, params=params, proxy=httpproxy) as response:134            return await response.text()135async def RandomUserAgent(wa=None):136    logme.debug(__name__+':RandomUserAgent')137    try:138        if wa:139            return "Mozilla/5.0 (Windows NT 6.4; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2225.0 Safari/537.36"140        return UserAgent(verify_ssl=False, use_cache_server=False).random141    except:142        return random.choice(user_agent_list)143async def Username(_id):144    logme.debug(__name__+':Username')145    url = f"https://twitter.com/intent/user?user_id={_id}&lang=en"146    r = await Request(url)147    soup = BeautifulSoup(r, "html.parser")148    return soup.find("a", "fn url alternate-context")["href"].replace("/", "")149async def Tweet(url, config, conn):150    logme.debug(__name__+':Tweet')151    try:152        response = await Request(url)153        soup = BeautifulSoup(response, "html.parser")154        tweets = soup.find_all("div", "tweet")155        await Tweets(tweets, config, conn, url)156    except Exception as e:157        logme.critical(__name__+':Tweet:' + str(e))158async def User(url, config, conn, user_id = False):159    logme.debug(__name__+':User')160    _connector = get_connector(config)161    try:162        response = await Request(url, connector=_connector)163        soup = BeautifulSoup(response, "html.parser")164        if user_id:165            return int(inf(soup, "id"))166        await Users(soup, config, conn)167    except Exception as e:168        logme.critical(__name__+':User:' + str(e))169def Limit(Limit, count):170    logme.debug(__name__+':Limit')171    if Limit is not None and count >= int(Limit):172        return True173async def Multi(feed, config, conn):174    logme.debug(__name__+':Multi')175    count = 0176    try:177        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:178            loop = asyncio.get_event_loop()179            futures = []180            for tweet in feed:181                count += 1182                if config.Favorites or config.Profile_full:183                    logme.debug(__name__+':Multi:Favorites-profileFull')184                    link = tweet.find("a")["href"]185                    url = f"https://twitter.com{link}&lang=en"186                elif config.User_full:187                    logme.debug(__name__+':Multi:userFull')188                    username = tweet.find("a")["name"]189                    url = f"http://twitter.com/{username}?lang=en"190                else:191                    logme.debug(__name__+':Multi:else-url')192                    link = tweet.find("a", "tweet-timestamp js-permalink js-nav js-tooltip")["href"]193                    url = f"https://twitter.com{link}?lang=en"194                if config.User_full:195                    logme.debug(__name__+':Multi:user-full-Run')196                    futures.append(loop.run_in_executor(executor, await User(url,197                        config, conn)))198                else:199                    logme.debug(__name__+':Multi:notUser-full-Run')200                    futures.append(loop.run_in_executor(executor, await Tweet(url,201                        config, conn)))202            logme.debug(__name__+':Multi:asyncioGather')203            await asyncio.gather(*futures)204    except Exception as e:205        # TODO: fix error not error206        # print(str(e) + " [x] get.Multi")207        # will return "'NoneType' object is not callable"208        # but still works209        # logme.critical(__name__+':Multi:' + str(e))210        pass...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
