Best Python code snippet using molotov_python
EHThread.py
Source:EHThread.py  
1# -*- coding: UTF-82__author__ = 'kz'3FETCH_SLEEP_INTERVAL = 14FETCH_SLEEP_INTERVAL_LONG = 55FETCH_RETRY_COUNT = 36IMAGE_WORKER_THREADS_COUNT = 57import time8import threading9import os10from EHentaiDownloader import Html, Http11class CommonException(Exception):12    """13    Common EHThread exception14    """15    pass16class ThreadTerminatedException(CommonException):17    """18    Tread is terminated simultaneously19    """20    pass21class AbstractEHThread(threading.Thread):22    """23    Abstract class with some common crap24    """25    def __init__(self, queue, name):26        """27        Initialize thread28        :param queue: queue.Queue - images queue29        :param name:  string      - thread unique name30        """31        from EHentaiDownloader import Environment32        self._app = Environment.Application()33        super().__init__(name=name)34        self._queue = queue35        self._slept = True36        self._stopped = False37        self._retried = 038class PageNavigator(AbstractEHThread):39    """40    Class which represents collecting direct links to images41    """42    def __init__(self, queue):43        """44        Initialize thread45        :param queue: queue.Queue - images queue46        """47        super().__init__(queue, 'PageNavigator')48        self._destination = self._app['destination']49        self._subpages = [self._app['uri']]50        self._parser = Html.EHTMLParser(self._app['uri'])51        self._httpClient = Http.Client()52        self._httpClient.changeUserAgent(self._app['user_agent'])53        self._pages = []54    def run(self):55        """56        Run thread57        """58        from EHentaiDownloader import Environment59        firstIteration = True60        pageNumber = 161        try:62            while self._subpages:63                subpage = self._subpages.pop(0)64                Environment.Log('Fetching subpage %s' % subpage, Environment.LOG_LEVEL_DEBUG)65                fetchFields = ('pages', 'subpages', 'meta') if firstIteration else ('pages',)66                result = self._fetchContent(subpage, fetchFields)67                if firstIteration:68                    meta = result['meta']69                    if not meta['title']['main']:70                        raise CommonException('Gallery has empty title or server response was incorrect. Aborted.')71                    Environment.Log('Title: {0[main]} / {0[jap]}'.format(meta['title']), Environment.LOG_LEVEL_INFO)72                    for k in meta['additional'].keys():73                        Environment.Log('{0:.<20}...{1}'.format(k, meta['additional'][k]), Environment.LOG_LEVEL_INFO)74                    self._subpages = result['subpages']75                    self._destination = '/'.join((self._destination.rstrip('/'), meta['title']['main']76                        .replace('/', '_')))77                    os.mkdir(self._destination)78                    self._app['destination'] = self._destination79                    self._app['total_images'] = meta['count']80                    Environment.Log('Found subpages: %s' % repr(self._subpages), Environment.LOG_LEVEL_DEBUG)81                Environment.Log('Found pages: %s' % repr(result['pages']), Environment.LOG_LEVEL_DEBUG)82                self._pages += result['pages']83                while self._pages:84                    page = self._pages.pop(0)85                    Environment.Log('Fetching page # %d; URI %s' % (pageNumber, page), Environment.LOG_LEVEL_DEBUG)86                    image = self._fetchContent(page, ('image',))['image']87                    Environment.Log('Found image %s, adding to queue' % image, Environment.LOG_LEVEL_DEBUG)88                    image['number'] = pageNumber89                    image['destination'] = self._destination90                    self._queue.put(image)91                    pageNumber += 192                firstIteration = False93            self._queue.join()94        except ThreadTerminatedException:95            pass96        except BaseException as e:97            error = Environment.ErrorInfo(self._name, e)98            self._app.setError(error)99        finally:100            self._app.complete()101    def stop(self):102        """103        Set flag to stop thread as soon as possible104        """105        self._stopped = True106    def _fetchContent(self, uri, fields):107        """108        Download a page and parse out all needed content109        :param uri:    string - uri to fetch content from110        :param fields: tuple  - tuple of fields to fetch labels111        :return:       dict   - dictionary with parsed values112        """113        from EHentaiDownloader import Environment114        if self._stopped:115            raise ThreadTerminatedException116        try:117            if not self._slept:118                Environment.Log('Sleeping %f seconds before the next request' % FETCH_SLEEP_INTERVAL,119                                Environment.LOG_LEVEL_DEBUG)120                time.sleep(FETCH_SLEEP_INTERVAL)121            result = {}122            self._parser.content = self._httpClient.get(uri)123            for field in fields:124                result[field] = self._parser[field]125            self._slept = False126            self._retried = 0127            return result128        except Html.TemporaryBanException:129            Environment.Log(130                'Temporary ban ocured, sleeping %f seconds before the next request' % FETCH_SLEEP_INTERVAL_LONG,131                Environment.LOG_LEVEL_WARN132            )133            time.sleep(FETCH_SLEEP_INTERVAL_LONG)134            self._slept = True135        except Html.EHTMLParserException as e:136            self._retried += 1137            self._slept = False138            Environment.Log('%s, retry # %d' % (str(e), self._retried), Environment.LOG_LEVEL_WARN)139            if self._retried == self._app['retries']:140                raise e141        return self._fetchContent(uri, fields)142class ImageDownloader(AbstractEHThread):143    """144    Image downloader thread145    """146    def __init__(self, queue, number=0):147        """148        Initialize image download worker thread149        :param queue:  queue.Queue - images queue150        :param number: int         - worker thread unique id151        """152        super().__init__(queue, 'ImageDownloader-%s' % number)153    def run(self):154        """155        Download images from queue156        """157        from EHentaiDownloader.Environment import ErrorInfo158        while True:159            try:160                self._performTask()161            except BaseException as e:162                self._retried += 1163                if self._retried == self._app['retries']:164                    error = ErrorInfo(self._name, e)165                    self._app.setError(error)166                    break167                self._performTask()168    def _performTask(self):169        """170        Get and perform a single task171        """172        from EHentaiDownloader import Environment173        imageInfo = self._queue.get()174        totalImages = self._app['total_images']175        percents = int(imageInfo['number'] / totalImages * 100)176        Environment.Log('Downloading image #{0}/{1} ({2}%)'.format(imageInfo['number'], totalImages, percents),177                        Environment.LOG_LEVEL_INFO, True)178        Environment.Log('Thread: %s; from: %s; to: %s' % (self._name, imageInfo['full_uri'], imageInfo['destination']),179                        Environment.LOG_LEVEL_DEBUG)180        httpClient = Http.Client(imageInfo['host'], imageInfo['port'])181        httpClient.changeUserAgent(self._app['user_agent'])182        httpClient.sendRequest(imageInfo['uri'])183        contentType = httpClient.getHeader('Content-type')184        extension = self._getExtensionFromContentType(contentType)185        fileName = '{image[destination]}/{image[number]:0={padding}}.{ext}'.format(186            image=imageInfo, ext=extension, padding=str(totalImages).__len__())187        file = open(fileName, 'wb')188        try:189            while True:190                file.write(httpClient.getChunk())191        except Http.ReadResponseException:192            pass193        file.close()194        httpClient.close()195        self._retried = 0196        self._queue.task_done()197    def _getExtensionFromContentType(self, contentType):198        """199        Try to get file extension according to content-type200        This crap is some kind of suitable for images only,201        but we don't need anything more202        :return: string - corresponding file extension203        """...index.py
Source:index.py  
1from flask import Flask2from flask import render_template, redirect, request3from pymongo import MongoClient4from datetime import datetime, date5from flask import jsonify6from multiprocessing import Pool7import os8import json9from wtforms.fields.core import SelectField10from libs.save_info import *11from libs.excel_parse import *12from libs.user_dao import *13from libs.sms_receive import *14# from libs.detect import *15import pandas as pd16from flask import send_file17from flask_bootstrap import Bootstrap18from flask_wtf import FlaskForm19from wtforms import StringField, SubmitField20from wtforms.validators import DataRequired21app = Flask(__name__)22# Flask-WTF requires an encryption key - the string can be anything23# MONGO_URI = "mongodb://127.0.0.1"24app.config['SECRET_KEY'] = 'C2HWGVoMGfNTBsrYQg8EcMrdTimkZfAb'25app.config['TEMPLATES_AUTO_RELOAD'] = True26Bootstrap(app)27SUBMITTED = False28SUBMITTED_DATE = None29class NameForm(FlaskForm):30    domain = StringField('åå', validators=[DataRequired()])31    country = SelectField('å½å®¶', validators=[DataRequired()], choices=[32                          ("china", "China"), ("usa", "USA")])33    submit = SubmitField('Submit')34@app.route("/", methods=['GET', 'POST'])35def hello():36    global SUBMITTED_DATE, SUBMITTED, TOTAL_COUNT37    form = NameForm()38    message = ""39    if SUBMITTED_DATE is not None and date.today() > SUBMITTED_DATE:40        SUBMITTED = False41        TOTAL_COUNT = 042    if form.validate_on_submit():43        if not SUBMITTED:44            domain = form.domain.data45            country = form.country.data46            # empty the form field47            form.domain.data = ""48            form.country.data = ""49            xl = []50            try:51                for i in range(4):52                    x = threading.Thread(53                        target=save_info, args=(domain, country))54                    x.start()55                    xl.append(x)56                SUBMITTED = True57                SUBMITTED_DATE = date.today()58            except:59                print("Error: unable to start thread")60                TOTAL_COUNT = 061            return "running..."62        else:63            return "already submitted.."64    return render_template("index.html", form=form)65@app.route("/run/<domain>/<country>/<number>/")66def run(domain, country, number):67    xl = []68    try:69        for i in range(4):70            x = threading.Thread(71                target=save_info, args=(domain, country, number))72            x.start()73            xl.append(x)74        for x in xl:75            x.join()76        TOTAL_COUNT = 077    except:78        print("Error: unable to start thread")79        TOTAL_COUNT = 080    return "runing..."81@app.route("/count")82def count():83    start = datetime.combine(date.today(), datetime.min.time())84    end = datetime.combine(date.today(), datetime.max.time())85    all_count = rdv_page.find({"date": {"$lt": end, "$gte": start}}).count()86    failed_count = len(get_all_failed(rdv_page))87    code_sent = len(get_all_successed(rdv_page))88    return str(date.today()) + f" æ»æ°ï¼{all_count}, 失败: {failed_count}, æå: {code_sent}"89@app.route("/send/<sim_line>/")90def send(sim_line):91    all_user = get_all_user_by_line(person_page, sim_line)92    # _div = [all_user[i:i+4] for i in range(0, len(all_user), 4)]93    count = 094    for p in all_user:95        count += send_request(p)96    fialed = len(all_user) - count97    # for sub_list in _div:98    #     _slept = randrange(50)99    #     sleep(_slept)100    #     # print(len(sub_list))101    #     with Pool(4) as p:102    #         p.map(send_request, sub_list)103    # with Pool(16) as p:104    #     p.map(send_request, all_user)105    # x.start()106    # x.join()107    # send_request(person_db, all_user)108    return f"sim line {sim_line} sent, {fialed} failed."109@app.route("/resend/<sim_line>/")110def resend(sim_line):111    all_user = get_all_failed_by_line(person_page, sim_line)112    # _div = [all_user[i:i+4] for i in range(0, len(all_user), 4)]113    count = 0114    for p in all_user:115        count += send_request(p)116    fialed = len(all_user) - count117    return f"sim line {sim_line} sent, {fialed} failed."118@app.route("/update")119def update():120    all_user = get_all_users(person_page)121    _div = [all_user[i:i+16] for i in range(0, len(all_user), 16)]122    for i, sub_list in enumerate(_div):123        # _slept = randrange(50)124        # sleep(_slept)125        # print(len(sub_list))126        print(i + 1)127        for p in sub_list:128            p["sim_line"] = i + 1129            person_page.replace_one({"_id":p["_id"]}, p,upsert=True)130    # with Pool(16) as p:131    #     p.map(send_request, all_user)132    # x.start()133    # x.join()134    # send_request(person_db, all_user)135    return "running..."136# @app.route("/resend")137# def resend():138#     all_user = get_all_failed(rdv_page)139    140#     _div = [all_user[i:i+16] for i in range(0, len(all_user), 16)]141#     # print(len(_div))142#     for sub_list in _div:143#         _slept = randrange(50)144#         sleep(_slept)145#         # print(len(sub_list))146#         with Pool(16) as p:147#             p.map(resend_request, sub_list)148#     # print(len(all_user))149#     # resend_request(all_user[0], rdv_page)150#     # with Pool(1) as p:151#     #     p.map(resend_request, all_user)152#     # x.start()153#     # x.join()154#     # send_request(person_db, all_user)155#     return "resending..."156@app.route("/detect")157def detect():158    all_rdv = get_all_successed(rdv_page)159    # rdv_object = rdv_page.find_one({"rdv_list": {"$elemMatch": {"phone_number": "33769142022"}}})160    # send_sms(rdv_page, "33758145465")161    for p in all_rdv:162        url = p["url"].replace("/register/", "/")163        print(url)164        detect_ok(url, p)165        # sleep(100)166    # print(rdv_object)167    return "dict(rdv_object)"168@app.route("/test")169def test():170    # rdv_object = rdv_page.find_one({"rdv_list": {"$elemMatch": {"phone_number": "33769142022"}}})171    send_sms(rdv_page, "33758145465")172    # print(rdv_object)173    return "dict(rdv_object)"174@app.route("/sms_input/<telephone_num>/<sms_text>/<date>/<sms_port>/", methods=['GET'])175def sms_input(telephone_num, sms_text, date, sms_port):176    print(telephone_num, sms_text, date, sms_port)177    save_sms_code(rdv_page, telephone_num, sms_text, sms_port, date)178    send_sms(rdv_page, telephone_num)179    resp = jsonify(success=True)180    return resp181@app.route("/upload", methods=['GET', 'POST'])182def upload_file():183    if request.method == 'POST':184        # print(request.files['file'])185        f = request.files['file']186        f.save(os.path.join("./assects/uploads", f.filename))187        people_list, data_file = parse_excel(f)188        insert_user(db_page=person_page, person_list=people_list)189        return data_file.to_html()190    return '''191    <!doctype html>192    <title>Upload an excel file</title>193    <h1>Excel file upload (csv, tsv, csvz, tsvz only)</h1>194    <form action="" method=post enctype=multipart/form-data>195    <p><input type=file name=file><input type=submit value=Upload>196    </form>197    '''198@app.route("/csv")199def csv():200    start = datetime.combine(date.today(), datetime.min.time())201    end = datetime.combine(date.today(), datetime.max.time())202    # Make a query to the specific DB and Collection203    cursor = rdv_page.find({"date": {"$lt": end, "$gte": start}, "status":True, "code_sent":True})204    # Expand the cursor and construct the DataFrame205    df = pd.DataFrame(list(cursor))206    # Delete the _id207    if '_id' in df:208        del df['_id']209    df.to_csv('person.csv', index=False)...state.py
Source:state.py  
1from game.states.manager import GameStateManager2from game.ui.component import Component3class RegisterGameState(type):4    """5    Meta class to register child classes of GameState with the6    GameStateManager.7    """8    def __init__(cls, name, bases, clsdict):9        super().__init__(name, bases, clsdict)10        if cls.ID != 'game_state':11            GameStateManager.register(cls)12class GameState(metaclass=RegisterGameState):13    """14    A base class for defining different game states. The flow of the game is15    managed by the `GameStateManager` which transitions the game between16    various `GameState`s.17    """18    # The ID of the state19    ID = 'game_state'20    # Possible statuses for a game state21    READY = 022    ACTIVE = 123    PAUSED = 224    # UI enabled and visible on pause25    PAUSED_UI_ENABLED = False26    PAUSED_UI_VISIBLE = False27    def __init__(self, game_state_manager):28        self._game_state_manager = game_state_manager29        self._status = self.READY30        self._ui_root = None31        self._slept = None32    @property33    def game(self):34        return self.game_state_manager.game35    @property36    def game_state_manager(self):37        return self._game_state_manager38    @property39    def paused(self):40        return self._status == self.PAUSED41    @property42    def status(self):43        return self._status44    @property45    def ui_root(self):46        return self._ui_root47    def enter(self, **kw):48        """49        Called when the game enter the state which then becomes the active50        state.51        """52        assert self.status == self.READY, \53            'State can only be entered if status is READY.'54        # Set the state to active55        self._status = self.ACTIVE56        # Add root UI component57        self._ui_root = Component()58        self._ui_root.add_tag('state_root')59        self._ui_root.bottom = 060        self._ui_root.right = 061        self.game.ui_root.add_child(self._ui_root)62        # Reset slept63        self._slept = None64    def leave(self):65        """66        Called when the game leaves this state.67        """68        assert self.status == self.ACTIVE or self.status == self.PAUSED, \69            'State can only be left if status is ACTIVE or PAUSED.'70        # Remove root UI component71        self.game.ui_root.remove_child(self._ui_root)72        del self._ui_root73        # Set state to ready74        self._status = self.READY75    def pause(self, new_state_id):76        """77        Called when the game pauses this state before transitioning to78        another.79        """80        assert self.status == self.ACTIVE, \81            'State can only be paused if status is ACTIVE.'82        self.ui_root.enabled = self.__class__.PAUSED_UI_ENABLED83        self.ui_root.visible = self.__class__.PAUSED_UI_VISIBLE84        self._status = self.PAUSED85    def resume(self, **kw):86        """Called when the game transitiong back to the paused state"""87        assert self.status == self.PAUSED, \88            'State can only be resumed if status is PAUSED.'89        self.ui_root.enabled = True90        self.ui_root.visible = True91        self._status = self.ACTIVE92    def input(self, char):93        """Handle the given user input for the state"""94        if chr(char) == 'q':95            # @@ This should switch us to a confirmation screen96            self.game.quit = True97        if self.ui_root:98            self.ui_root.input(char)99    def update(self, dt):100        """101        Update the state based on the time ellapsed between now and the102        previous call to update.103        """104        if self._slept is not None:105            self._slept -= dt106            if self._slept <= 0:107                self._slept = None108        if self.ui_root:109            self.ui_root.update(dt)110    def render(self):111        """Render the state's visual representation to the screen"""112        if self.ui_root:113            self.ui_root.render(self.game.main_window)114    # Helpers115    def slept(self, seconds):116        """117        Slept provides a simple mechanism to execute a block of code at118        regular intervals (the given number of seconds) while not blocking119        the state. The first call to slept will always return True, subsequent120        calls will only return True once the the given period of time has121        past.122        NOTE: Slept isn't designed to be accurate, it relies on the delta (dt)123        set in update and therefore is susceptible to other blocking code.124        """125        if self._slept is None:126            self._slept = seconds127            return True...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
