Best Python code snippet using ATX
anchore_feeds.py
Source:anchore_feeds.py  
...93            if record and 'err_msg' in record:94                ret = record.get('err_msg')95            done = True96    return (success, ret)97def create_feed(feed):98    if not feed:99        return (False)100    return (contexts['anchore_db'].create_feed(feed))101def create_feedgroup(feed, group):102    if not feed or not group:103        return (False)104    return (contexts['anchore_db'].create_feedgroup(feed, group))105def sync_feedmeta(default_sublist=['vulnerabilities']):106    ret = {'success': False, 'text': "", 'status_code': 1}107    try:108        feedmeta = load_anchore_feedmeta()109        feeds, record = get_feed_list()110        if feeds:111            for feedrecord in feeds:112                feed = feedrecord['name']113                if feed not in feedmeta:114                    feedmeta[feed] = {}115                feedmeta[feed].update(feedrecord)116                if 'groups' not in feedmeta[feed]:117                    feedmeta[feed]['groups'] = {}118                if 'subscribed' not in feedmeta[feed]:119                    if feed in default_sublist:120                        feedmeta[feed]['subscribed'] = True121                    else:122                        feedmeta[feed]['subscribed'] = False123                rc = create_feed(feed)124                groups, record = get_group_list(feed)125                if groups:126                    for grouprecord in groups:127                        group = grouprecord['name']128                        if group not in feedmeta[feed]['groups']:129                            feedmeta[feed]['groups'][group] = {}130                        feedmeta[feed]['groups'][group].update(grouprecord)131                        rc = create_feedgroup(feed, group)132                else:133                    ret['text'] = "WARN: cannot get list of groups from feed: " + str(feed)134            ret['success'] = True135            ret['status_code'] = 0136        else:137            ret['text'] = "cannot get list of feeds from service\nMessage from server: " + record['text']138            return (False, ret)139        save_anchore_feedmeta(feedmeta)140    except Exception as err:141        import traceback142        traceback.print_exc()143        _logger.debug("exception: " + str(err))144        ret['text'] = "exception: " + str(err)145        return (False, ret)146    return (True, ret)147def feed_group_data_exists(feed, group, datafile):148    feedmeta = load_anchore_feedmeta()149    if feed in list(feedmeta.keys()) and group in list(feedmeta[feed]['groups'].keys()) and 'datafiles' in feedmeta[feed]['groups'][150        group] and datafile in feedmeta[feed]['groups'][group]['datafiles']:151        return (True)152    return (False)153def sync_feeds(force_since=None):154    ret = {'success': False, 'text': "", 'status_code': 1}155    feedmeta = load_anchore_feedmeta()156    feedurl = contexts['anchore_config']['feeds_url']157    try:158        for feed in list(feedmeta.keys()):159            if feedmeta[feed]['subscribed']:160                _logger.info("syncing data for subscribed feed (" + str(feed) + ") ...")161                groups = list(feedmeta[feed]['groups'].keys())162                if groups:163                    for group in groups:164                        sincets = 0165                        group_meta = {}166                        if group in feedmeta[feed]['groups']:167                            group_meta = feedmeta[feed]['groups'][group]168                            if 'last_update' in group_meta:169                                sincets = group_meta['last_update']170                        if force_since:171                            sincets = float(force_since)172                        if 'datafiles' not in group_meta:173                            group_meta['datafiles'] = list()174                        updatetime = int(time.time())175                        since = time.strftime("%Y-%m-%d", time.gmtime(sincets))176                        now = time.strftime("%Y-%m-%d", time.gmtime(updatetime))177                        datafilename = "data_" + since + "_to_" + now + ".json"178                        if feed_group_data_exists(feed, group, datafilename):179                            _logger.info("\tskipping group data: " + str(group) + ": already synced")180                        else:181                            _logger.info("\tsyncing group data: " + str(group) + ": ...")182                            success, data = get_group_data(feed, group, since=since)183                            if success:184                                rc = save_anchore_feed_group_data(feed, group, datafilename, data)185                                group_meta['prev_update'] = sincets186                                group_meta['last_update'] = updatetime187                                if datafilename not in group_meta['datafiles']:188                                    group_meta['datafiles'].append(datafilename)189                                # finally, do any post download feed/group handler actions190                                rc, msg = handle_anchore_feed_post(feed, group)191                            else:192                                if data and isinstance(data, str):193                                    err_msg = data194                                else:195                                    err_msg = 'check --debug output and/or try again'196                                _logger.warn("\t\tWARN: failed to download feed/group data (" + str(feed) + "/" + str(197                                    group) + "): {}".format(err_msg))198                            rc, msg = handle_anchore_feed_post(feed, group)199            else:200                _logger.info("skipping data sync for unsubscribed feed (" + str(feed) + ") ...")201        ret['status_code'] = 0202        ret['success'] = True203    except Exception as err:204        import traceback205        traceback.print_exc()206        _logger.debug("exception: " + str(err))207        ret['text'] = "ERROR: " + str(err)208        return (False, ret)209    if not save_anchore_feedmeta(feedmeta):210        ret['text'] = "\t\tWARN: failed to store metadata on synced feed data"211    return (True, ret)212def check():213    if not load_anchore_feedmeta():214        return (False, "feeds are not initialized: please run 'anchore feeds sync' and try again")215    if not load_anchore_feeds_list():216        return (False, "feeds list is empty: please run 'anchore feeds sync' and try again")217    return (True, "success")218def load_anchore_feeds_list():219    ret = []220    feedmeta = load_anchore_feedmeta()221    if feedmeta:222        ret = list(feedmeta.values())223    return (ret)224def load_anchore_feed_groups_list(feed):225    ret = []226    feedmeta = load_anchore_feedmeta()227    if feedmeta:228        if feed in list(feedmeta.keys()):229            ret = list(feedmeta[feed]['groups'].values())230    return (ret)231def load_anchore_feed_group_datameta(feed, group):232    ret = {}233    feedmeta = load_anchore_feedmeta()234    if feedmeta:235        if feed in list(feedmeta.keys()) and group in list(feedmeta[feed]['groups'].keys()):236            ret = feedmeta[feed]['groups'][group]237    return (ret)238def load_anchore_feedmeta():239    return (contexts['anchore_db'].load_feedmeta())240def save_anchore_feedmeta(feedmeta):241    return (contexts['anchore_db'].save_feedmeta(feedmeta))242def subscribe_anchore_feed(feed, user_tier=0):243    success = True244    msg = str(feed) + ": subscribed."245    feedmeta = load_anchore_feedmeta()246    if feed in feedmeta:247        if user_tier >= int(feedmeta[feed]['access_tier']):248            if not feedmeta[feed]['subscribed']:249                feedmeta[feed]['subscribed'] = True250                if not save_anchore_feedmeta(feedmeta):251                    msg = str(feed) + ": failed to subscribe to feed (check debug output)."252                    success = False253        else:254            msg = 'Current user does not have sufficient access tier to subscribe to feed {0}. Current tier is {1}, must be {2} to access feed'.format(feed, user_tier, feedmeta[feed]['access_tier'])255            success = False256    else:257        msg = "cannot find specified feed (" + str(feed) + "): please review the feeds list and try again"258        success = False259    return (success, msg)260def unsubscribe_anchore_feed(feed):261    success = True262    msg = str(feed) + ": unsubscribed."263    feedmeta = load_anchore_feedmeta()264    if feed in feedmeta:265        if feedmeta[feed]['subscribed']:266            feedmeta[feed]['subscribed'] = False267            if not save_anchore_feedmeta(feedmeta):268                msg = str(feed) + ": failed to unsubscribe to feed (check debug output)."269                success = False270    else:271        msg = "cannot find specified feed (" + str(feed) + "): please review the feeds list and try again"272        success = False273    return (success, msg)274def save_anchore_feed_group_data(feed, group, datafile, data):275    return (contexts['anchore_db'].save_feed_group_data(feed, group, datafile, data))276def load_anchore_feed_group_data(feed, group, datafile):277    return (contexts['anchore_db'].load_feed_group_data(feed, group, datafile))278def load_anchore_feed(feed, group, ensure_unique=False):279    ret = {'success': False, 'msg': "", 'data': list()}280    datameta = {}281    doupdate = False282    feedmeta = load_anchore_feedmeta()283    if not feedmeta:284        ret['msg'] = "feed data does not exist: please sync feed data"285    elif feed in feedmeta and not feedmeta[feed]['subscribed']:286        ret['msg'] = "not currently subscribed to feed (" + str(feed) + "): please subscribe, sync, and try again"287    else:288        if feed in feedmeta and group in feedmeta[feed]['groups']:289            datameta = feedmeta[feed]['groups'][group]290        if datameta and 'datafiles' in datameta:291            unique_hash = {}292            for datafile in sorted(datameta['datafiles']):293                thelist = load_anchore_feed_group_data(feed, group, datafile)294                if ensure_unique:295                    for el in thelist:296                        if isinstance(el, dict) and len(list(el.keys())) == 1:297                            elkey = list(el.keys())[0]298                            if elkey in unique_hash:299                                _logger.warn("FOUND duplicate entry during scan for unique data values: " + str(elkey))300                            unique_hash[elkey] = el301                ret['data'] = ret['data'] + thelist302                ret['success'] = True303                ret['msg'] = "success"304            if ret['success'] and ensure_unique:305                ret['data'] = list(unique_hash.values())306        else:307            ret['msg'] = "no data exists for given feed/group (" + str(feed) + "/" + str(group) + ")"308    return (ret)309def delete_anchore_feed(feed):310    feedmeta = load_anchore_feedmeta()311    if feed in list(feedmeta.keys()):312        if not feedmeta[feed]['subscribed']:313            return (contexts['anchore_db'].delete_feed(feed))314        else:315            _logger.warn(316                "skipping delete of feed that is marked as subscribed - please unsubscribe first and then retry the delete")317    return (True)318# TODO wip319def handle_anchore_feed_pre(feed):320    ret = True321    msg = ""322    feedmeta = load_anchore_feedmeta()323    if feed in feedmeta:324        if feed == 'vulnerabilities':325            if not feedmeta[feed]['subscribed']:326                rc, msg = subscribe_anchore_feed(feed)327                ret = rc328    return (ret, msg)329def handle_anchore_feed_post(feed, group):330    ret = True331    msg = ""332    if feed == 'imagedata':333        # handler334        d = load_anchore_feed(feed, group)335        if d and d['success']:336            for record in d['data']:337                if 'redirecturl' in list(record.keys()):338                    for tag in list(record['redirecturl'].keys()):339                        url = record['redirecturl'][tag]340                        imageId = tag341                        if not contexts['anchore_db'].is_image_present(imageId):342                            try:343                                r = requests.get(url, timeout=10)344                                if r.status_code == 200:345                                    data = json.loads(r.text)346                                    for imagedata in data:347                                        imagerecord = imagedata['image']['imagedata']348                                        _logger.info("\t\tpopulating anchore DB with image data: " + imageId)...sub.py
Source:sub.py  
...218        outline = Tag(name='outline', attrs={'text': _sub.title or _sub.feed.title, 'xmlUrl': _sub.feed.link})219        opml.body.append(outline)220    if empty_flags:221        return None222    logger.info(f'Exported feed(s) for {user_id}')223    return opml.prettify().encode()224async def migrate_to_new_url(feed: db.Feed, new_url: str) -> Union[bool, db.Feed]:225    """226    Migrate feed's link to new url, useful when a feed is redirected to a new url.227    :param feed:228    :param new_url:229    :return:230    """231    if feed.link == new_url:232        return False233    logger.info(f'Migrating {feed.link} to {new_url}')234    new_url_feed = await db.Feed.get_or_none(link=new_url)235    if new_url_feed is None:  # new_url not occupied236        feed.link = new_url...test_models.py
Source:test_models.py  
...60        feed = Feed.objects.create(link=self.feed_url)61        self.assertTrue(self.mock_fetch_feedparser_dict.called)62        self.assertEqual(Feed.objects.count(), 1)6364    def test_updating_and_saving_existing_feed(self):65        feed = Feed.objects.create(link=self.feed_url)6667        with patch.object(Feed, 'fetch_and_set_feed_details') as mock_fetch:68            # Modify values for existing feed and 69            # test that feed initialization no longer takes place70            my_feed = Feed.objects.get(link=feed.link)71            my_feed.title = 'New title'72            my_feed.description = 'New description'73            my_feed.save()74            self.assertFalse(mock_fetch.called)7576        # Test if changes took effect77        this_feed = Feed.objects.get(link=feed.link)78        self.assertEqual(this_feed.title, 'New title')
...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
