Best Python code snippet using stestr_python
utils.py
Source:utils.py  
1import ast2import datetime3import hashlib4import random5import re6import string7from django.conf import settings8from django.http import HttpResponse9from django.template import Template10from django.template.base import VariableNode11from django.utils import timezone12from django.utils.http import urlencode13from oauth2_provider.oauth2_backends import OAuthLibCore, get_oauthlib_core14from rest_framework.pagination import PageNumberPagination, LimitOffsetPagination15from rest_framework.renderers import JSONRenderer16from crowdsourcing.crypto import to_pk17from crowdsourcing.redis import RedisProvider18class SmallResultsSetPagination(LimitOffsetPagination):19    default_limit = 10020def is_discount_eligible(user):21    if user.email[-4:] in settings.NON_PROFIT_EMAILS:22        return True23    return False24def get_pk(id_or_hash):25    try:26        project_id = int(id_or_hash)27        return project_id, False28    except Exception:29        return to_pk(id_or_hash), True30def get_delimiter(filename, *args, **kwargs):31    delimiter_map = {'csv': ',', 'tsv': '\t'}32    delimiter = None33    extension = filename.split('.')[-1]34    if extension in delimiter_map:35        delimiter = delimiter_map[extension]36    return delimiter37def get_model_or_none(model, *args, **kwargs):38    """39        Get model object or return None, this will catch the DoesNotExist error.40        Keyword Arguments:41        model -- this is the model you want to query from42        other parameters are of variable length: e.g id=1 or username='jon.snow'43    """44    try:45        return model.objects.get(*args, **kwargs)46    except model.DoesNotExist:47        return None48def get_next_unique_id(model, field, value):49    """50    Find next available incrementing value for a field in model.51    :param model: Model to be queried52    :param field: Model field to find value for53    :param value: Field value for which the next increment which is unique and available is to be found54    :return: the next unique increment value in model for the field considering index value from 155    """56    condition = {}57    condition['%s__iregex' % field] = r'^%s[0-9]+$' % value58    values = model.objects.filter(**condition).values_list(field, flat=True)59    integers = map(lambda x: int(x.replace(value, '')), values)60    # complete sequence plus 1 extra if no gap exists61    all_values = range(1, len(integers) + 2)62    gap = list(set(all_values) - set(integers))[0]63    new_field_value = '%s%d' % (value, gap)64    return new_field_value65def get_time_delta(time_stamp):66    if time_stamp is None:67        return ""68    difference = timezone.now() - time_stamp69    days = difference.days70    hours = difference.seconds // 360071    minutes = (difference.seconds // 60) % 6072    if minutes > 0 and hours == 0 and days == 0:73        minutes_calculated = str(minutes) + " minutes "74    elif minutes > 0 and (hours != 0 or days != 0):75        minutes_calculated = ""76    else:77        minutes_calculated = "1 minute "78    return "{days}{hours}{minutes}".format(days=str(days) + " day(s) " if days > 0 else "",79                                           hours=str(hours) + " hour(s) " if hours > 0 and days == 0 else "",80                                           minutes=minutes_calculated) + "ago"81class Oauth2Backend(OAuthLibCore):82    def _extract_params(self, request):83        """84        Extract parameters from the Django request object. Such parameters will then be passed to85        OAuthLib to build its own Request object. The body should be encoded using OAuthLib urlencoded86        """87        uri = self._get_escaped_full_path(request)88        http_method = request.method89        headers = {}  # self.extract_headers(request)90        body = urlencode(self.extract_body(request))  # TODO91        return uri, http_method, body, headers92    def create_token_response(self, request):93        """94        A wrapper method that calls create_token_response on `server_class` instance.95        :param request: The current django.http.HttpRequest object96        """97        uri, http_method, body, headers = self._extract_params(request)98        headers, body, status = get_oauthlib_core().server.create_token_response(uri, http_method, body,99                                                                                 headers)100        uri = headers.get("Location", None)101        return uri, headers, body, status102    def extract_body(self, request):103        """104        Extracts the POST body from the Django request object105        :param request: The current django.http.HttpRequest object106        :return: provided POST parameters107        """108        return request.data.items()109class Oauth2Utils:110    def create_client(self, request, user):111        from oauth2_provider.models import Application112        oauth2_client = Application.objects.create(user=user,113                                                   client_type=Application.CLIENT_PUBLIC,114                                                   authorization_grant_type=Application.GRANT_PASSWORD)115        return oauth2_client116    def get_token(self, request):117        oauth2_backend = Oauth2Backend()118        uri, headers, body, status = oauth2_backend.create_token_response(request)119        response_data = {}120        response_data["message"] = "OK"121        response_data.update(ast.literal_eval(body))122        return response_data, status123    def get_refresh_token(self, request):124        pass125class SmallResultSetPagination(PageNumberPagination):126    page_size = 25127    page_size_query_param = 'page_size'128    max_page_size = 100129class JSONResponse(HttpResponse):130    """131    An HttpResponse that renders its content into JSON.132    """133    def __init__(self, data, **kwargs):134        content = JSONRenderer().render(data)135        kwargs['content_type'] = 'application/json'136        super(JSONResponse, self).__init__(content, **kwargs)137def generate_random_id(length=8, chars=string.ascii_lowercase + string.digits):138    return ''.join(random.choice(chars) for _ in range(length))139def get_relative_time(date_time):140    delta = datetime.timedelta(days=7)141    current = timezone.now()142    difference = current - date_time143    if difference.total_seconds() - delta.total_seconds() > 0:144        return date_time.strftime("%b") + ' ' + str(date_time.day)145    else:146        one_day = datetime.timedelta(days=1)147        if difference.total_seconds() - one_day.total_seconds() > 0:148            return date_time.strftime("%a")149        else:150            return date_time.strftime('%I:%M %p').lstrip('0')151def get_worker_cache(worker_id):152    provider = RedisProvider()153    name = provider.build_key('worker', worker_id)154    worker_stats = provider.hgetall(name)155    worker_groups = provider.smembers(name + ':worker_groups')156    approved = int(worker_stats.get('approved', 0))157    rejected = int(worker_stats.get('rejected', 0))158    submitted = int(worker_stats.get('submitted', 0))159    gender = worker_stats.get('gender')160    birthday_year = worker_stats.get('birthday_year')161    ethnicity = worker_stats.get('ethnicity')162    is_worker = worker_stats.get('is_worker', 0)163    is_requester = worker_stats.get('is_requester', 0)164    approval_rate = None165    if approved + rejected > 0:166        approval_rate = float(approved) / float(approved + rejected)167    worker_data = {168        "country": worker_stats.get('country', None),169        "approval_rate": approval_rate,170        "total_tasks": approved + rejected + submitted,171        "approved_tasks": approved,172        "worker_groups": list(worker_groups),173        "gender": gender,174        "birthday_year": birthday_year,175        "ethnicity": ethnicity,176        "is_worker": is_worker,177        "is_requester": is_requester178    }179    return worker_data180def create_copy(instance):181    instance.pk = None182    instance.save()183    return instance184def get_review_redis_message(match_group_id, project_key):185    message = {186        "type": "REVIEW",187        "payload": {188            "match_group_id": match_group_id,189            'project_key': project_key,190            "is_done": True191        }192    }193    return message194def replace_braces(s):195    return re.sub(r'\s(?=[^\{\}]*}})', '', unicode(s))196def get_template_string(initial_data, data):197    initial_data = replace_braces(initial_data)198    html_template = Template(initial_data)199    return_value = ''200    has_variables = False201    for node in html_template.nodelist:202        if isinstance(node, VariableNode):203            return_value += unicode(data.get(node.token.contents, ''))204            has_variables = True205        else:206            return_value += unicode(node.token.contents)207    return return_value, has_variables208def get_template_tokens(initial_data):209    initial_data = replace_braces(initial_data)210    html_template = Template(initial_data)211    return [node.token.contents for node in html_template.nodelist if isinstance(node, VariableNode)]212def flatten_dict(d, separator='_', prefix=''):213    return {prefix + separator + k if prefix else k: v214            for kk, vv in d.items()215            for k, v in flatten_dict(vv, separator, kk).items()216            } if isinstance(d, dict) else {prefix: d}217def hash_task(data):218    return hashlib.sha256(repr(sorted(frozenset(flatten_dict(data))))).hexdigest()219def hash_as_set(data):220    return hashlib.sha256(repr(sorted(frozenset(data)))).hexdigest()221def get_trailing_number(s):222    m = re.search(r'\d+$', s)...basic_share_limiter.py
Source:basic_share_limiter.py  
1import lib.settings as settings2import lib.logger3log = lib.logger.get_logger('BasicShareLimiter')4import DBInterface5dbi = DBInterface.DBInterface()6dbi.clear_worker_diff()7from twisted.internet import defer8from mining.interfaces import Interfaces9import time10''' This is just a customized ring buffer '''11class SpeedBuffer:12    def __init__(self, size_max):13        self.max = size_max14        self.data = []15        self.cur = 016        17    def append(self, x):18        self.data.append(x)19        self.cur += 120        if len(self.data) == self.max:21            self.cur = 022            self.__class__ = SpeedBufferFull23            24    def avg(self):25        return sum(self.data) / self.cur26       27    def pos(self):28        return self.cur29           30    def clear(self):31        self.data = []32        self.cur = 033            34    def size(self):35        return self.cur36class SpeedBufferFull:37    def __init__(self, n):38        raise "you should use SpeedBuffer"39           40    def append(self, x):                41        self.data[self.cur] = x42        self.cur = (self.cur + 1) % self.max43            44    def avg(self):45        return sum(self.data) / self.max46           47    def pos(self):48        return self.cur49           50    def clear(self):51        self.data = []52        self.cur = 053        self.__class__ = SpeedBuffer54            55    def size(self):56        return self.max57class BasicShareLimiter(object):58    def __init__(self):59        self.worker_stats = {}60        self.target = settings.VDIFF_TARGET_TIME61        self.retarget = settings.VDIFF_RETARGET_TIME62        self.variance = self.target * (float(settings.VDIFF_VARIANCE_PERCENT) / float(100))63        self.tmin = self.target - self.variance64        self.tmax = self.target + self.variance65        self.buffersize = self.retarget / self.target * 466        self.litecoin = {}67        self.litecoin_diff = 100000000 # TODO: Set this to VARDIFF_MAX68        # TODO: trim the hash of inactive workers69    @defer.inlineCallbacks70    def update_litecoin_difficulty(self):71        # Cache the litecoin difficulty so we do not have to query it on every submit72        # Update the difficulty  if it is out of date or not set73        if 'timestamp' not in self.litecoin or self.litecoin['timestamp'] < int(time.time()) - settings.DIFF_UPDATE_FREQUENCY:74            self.litecoin['timestamp'] = time.time()75            self.litecoin['difficulty'] = (yield Interfaces.template_registry.bitcoin_rpc.getdifficulty())76            log.debug("Updated litecoin difficulty to %s" %  (self.litecoin['difficulty']))77        self.litecoin_diff = self.litecoin['difficulty']78    def submit(self, connection_ref, job_id, current_difficulty, timestamp, worker_name):79        ts = int(timestamp)80        # Init the stats for this worker if it isn't set.        81        if worker_name not in self.worker_stats or self.worker_stats[worker_name]['last_ts'] < ts - settings.DB_USERCACHE_TIME :82            self.worker_stats[worker_name] = {'last_rtc': (ts - self.retarget / 2), 'last_ts': ts, 'buffer': SpeedBuffer(self.buffersize) }83            dbi.update_worker_diff(worker_name, settings.POOL_TARGET)84            return85        86        # Standard share update of data87        self.worker_stats[worker_name]['buffer'].append(ts - self.worker_stats[worker_name]['last_ts'])88        self.worker_stats[worker_name]['last_ts'] = ts89        # Do We retarget? If not, we're done.90        if ts - self.worker_stats[worker_name]['last_rtc'] < self.retarget and self.worker_stats[worker_name]['buffer'].size() > 0:91            return92        # Set up and log our check93        self.worker_stats[worker_name]['last_rtc'] = ts94        avg = self.worker_stats[worker_name]['buffer'].avg()95        log.debug("Checking Retarget for %s (%i) avg. %i target %i+-%i" % (worker_name, current_difficulty, avg,96                self.target, self.variance))97        98        if avg < 1:99            log.warning("Reseting avg = 1 since it's SOOO low")100            avg = 1101        # Figure out our Delta-Diff102        if settings.VDIFF_FLOAT:103            ddiff = float((float(current_difficulty) * (float(self.target) / float(avg))) - current_difficulty)104        else:105            ddiff = int((float(current_difficulty) * (float(self.target) / float(avg))) - current_difficulty)106        if (avg > self.tmax and current_difficulty > settings.VDIFF_MIN_TARGET):107            # For fractional -0.1 ddiff's just drop by 1108            if settings.VDIFF_X2_TYPE:109                ddiff = 0.5110                # Don't drop below POOL_TARGET111                if (ddiff * current_difficulty) < settings.VDIFF_MIN_TARGET:112                    ddiff = settings.VDIFF_MIN_TARGET / current_difficulty113            else:114                if ddiff > -1:115                    ddiff = -1116                # Don't drop below POOL_TARGET117                if (ddiff + current_difficulty) < settings.POOL_TARGET:118                    ddiff = settings.VDIFF_MIN_TARGET - current_difficulty119        elif avg < self.tmin:120            # For fractional 0.1 ddiff's just up by 1121            if settings.VDIFF_X2_TYPE:122                ddiff = 2123                # Don't go above LITECOIN or VDIFF_MAX_TARGET            124                if settings.USE_COINDAEMON_DIFF:125                    self.update_litecoin_difficulty()126                    diff_max = min([settings.VDIFF_MAX_TARGET, self.litecoin_diff])127                else:128                    diff_max = settings.VDIFF_MAX_TARGET129                if (ddiff * current_difficulty) > diff_max:130                    ddiff = diff_max / current_difficulty131            else:132                if ddiff < 1:133                   ddiff = 1134                # Don't go above LITECOIN or VDIFF_MAX_TARGET135                if settings.USE_COINDAEMON_DIFF:136                   self.update_litecoin_difficulty()137                   diff_max = min([settings.VDIFF_MAX_TARGET, self.litecoin_diff])138                else:139                   diff_max = settings.VDIFF_MAX_TARGET140                if (ddiff + current_difficulty) > diff_max:141                    ddiff = diff_max - current_difficulty142            143        else:  # If we are here, then we should not be retargeting.144            return145        # At this point we are retargeting this worker146        if settings.VDIFF_X2_TYPE:147            new_diff = current_difficulty * ddiff148        else:149            new_diff = current_difficulty + ddiff150        log.debug("Retarget for %s %i old: %i new: %i" % (worker_name, ddiff, current_difficulty, new_diff))151        self.worker_stats[worker_name]['buffer'].clear()152        session = connection_ref().get_session()153        (job_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, _) = \154            Interfaces.template_registry.get_last_broadcast_args()155        work_id = Interfaces.worker_manager.register_work(worker_name, job_id, new_diff)156        157        session['difficulty'] = new_diff158        connection_ref().rpc('mining.set_difficulty', [new_diff, ], is_notification=True)159        log.debug("Notified of New Difficulty")160        connection_ref().rpc('mining.notify', [work_id, prevhash, coinb1, coinb2, merkle_branch, version, nbits, ntime, False, ], is_notification=True)161        log.debug("Sent new work")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
