Best Python code snippet using playwright-python
testpath_volume_recurring_snap.py
Source:testpath_volume_recurring_snap.py  
...130        except Exception as e:131            raise Exception("Warning: Exception during cleanup : %s" % e)132        return133    @attr(tags=["advanced", "basic"],required_hardware="true")134    def test_01_volume_snapshot(self):135        """ Test Volume (root) Snapshot136        # 1. Create Hourly, Daily,Weekly recurring snapshot policy for ROOT disk and 137                    Verify the presence of the corresponding snapshots on the Secondary Storage138        # 2. Delete the snapshot policy and verify the entry as Destroyed in snapshot_schedule139        # 3. Verify that maxsnaps should not consider manual snapshots for deletion140        # 4. Snapshot policy should reflect the correct timezone141        # 5. Verify that listSnapshotPolicies() should return all snapshot policies142                that belong to the account (both manual and recurring snapshots)143        # 6. Verify that listSnapshotPolicies() should not return snapshot 144                policies that have been deleted145        # 7. Verify that snapshot should not be created for VM in Destroyed state146        # 8. Verify that snapshot should get created after resuming the VM147        # 9. Verify that All the recurring policies associated with the VM should be 148                deleted after VM get destroyed.149        """150        # Step 1151        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'152        recurring_snapshot = SnapshotPolicy.create(153            self.apiclient,154            self.volume[0].id,155            self.testdata["recurring_snapshot"]156        )157        # ListSnapshotPolicy should return newly created policy158        list_snapshots_policy = list_snapshot_policy(159            self.apiclient,160            id=recurring_snapshot.id,161            volumeid=self.volume[0].id162        )163        list_validation = validateList(list_snapshots_policy)164        self.assertEqual(165            list_validation[0],166            PASS,167            "snapshot list validation failed due to %s" %168            list_validation[2])169        timeout = self.testdata["timeout"]170        while True:171            snapshots = list_snapshots(172                self.apiclient,173                volumeid=self.volume[0].id,174                intervaltype=self.testdata[175                    "recurring_snapshot"]["intervaltype"],176                snapshottype='RECURRING',177                listall=True178            )179            if isinstance(snapshots, list):180                break181            elif timeout == 0:182                raise Exception("List snapshots API call failed.")183        for snapshot in snapshots:184            self.assertEqual(185                self.dbclient.execute(186                    "select type_description from snapshots where name='%s'" %187                    snapshot.name)[0][0],188                "HOURLY"189            )190        time.sleep(180)191        for snapshot in snapshots:192            self.assertTrue(193                is_snapshot_on_nfs(194                    self.apiclient,195                    self.dbclient,196                    self.config,197                    self.zone.id,198                    snapshot.id))199        recurring_snapshot.delete(self.apiclient)200        self.assertEqual(201            self.dbclient.execute(202                "select * from snapshot_policy where uuid='%s'" %203                recurring_snapshot.id),204            []205        )206        self.testdata["recurring_snapshot"]["intervaltype"] = 'DAILY'207        self.testdata["recurring_snapshot"]["schedule"] = '00:00'208        recurring_snapshot_daily = SnapshotPolicy.create(209            self.apiclient,210            self.volume[0].id,211            self.testdata["recurring_snapshot"]212        )213        list_snapshots_policy_daily = list_snapshot_policy(214            self.apiclient,215            id=recurring_snapshot_daily.id,216            volumeid=self.volume[0].id217        )218        list_validation = validateList(list_snapshots_policy_daily)219        self.assertEqual(220            list_validation[0],221            PASS,222            "snapshot list validation failed due to %s" %223            list_validation[2])224        snap_db_daily = self.dbclient.execute(225            "select * from snapshot_policy where uuid='%s'" %226            recurring_snapshot_daily.id)227        validation_result_1 = validateList(snap_db_daily)228        self.assertEqual(229            validation_result_1[0],230            PASS,231            "snapshot_policy list validation failed due to %s" %232            validation_result_1[2])233        self.assertNotEqual(234            len(snap_db_daily),235            0,236            "Check DB Query result set"237        )238        recurring_snapshot_daily.delete(self.apiclient)239        self.assertEqual(240            self.dbclient.execute(241                "select * from snapshot_policy where uuid='%s'" %242                recurring_snapshot_daily.id),243            []244        )245    246        self.testdata["recurring_snapshot"]["intervaltype"] = 'WEEKLY'247        self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'248        recurring_snapshot_weekly = SnapshotPolicy.create(249            self.apiclient,250            self.volume[0].id,251            self.testdata["recurring_snapshot"]252        )253        list_snapshots_policy_weekly = list_snapshot_policy(254            self.apiclient,255            id=recurring_snapshot_weekly.id,256            volumeid=self.volume[0].id257        )258        list_validation = validateList(list_snapshots_policy_weekly)259        self.assertEqual(260            list_validation[0],261            PASS,262            "snapshot list validation failed due to %s" %263            list_validation[2])264        snap_sch_2 = self.dbclient.execute(265            "select * from snapshot_policy where uuid='%s'" %266            recurring_snapshot_weekly.id)267        validation_result_2 = validateList(snap_sch_2)268        self.assertEqual(269            validation_result_2[0],270            PASS,271            "snapshot_policy list validation failed due to %s" %272            validation_result_2[2])273        self.assertNotEqual(274            len(snap_sch_2),275            0,276            "Check DB Query result set"277        )278        recurring_snapshot_weekly.delete(self.apiclient)279        self.assertEqual(280            self.dbclient.execute(281                "select * from snapshot_policy where uuid='%s'" %282                recurring_snapshot_weekly.id),283            []284        )285        self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'286        self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'287        recurring_snapshot_monthly = SnapshotPolicy.create(288            self.apiclient,289            self.volume[0].id,290            self.testdata["recurring_snapshot"]291        )292        list_snapshots_policy_monthly = list_snapshot_policy(293            self.apiclient,294            id=recurring_snapshot_monthly.id,295            volumeid=self.volume[0].id296        )297        list_validation = validateList(list_snapshots_policy_monthly)298        self.assertEqual(299            list_validation[0],300            PASS,301            "snapshot list validation failed due to %s" %302            list_validation[2])303        snap_sch_3 = self.dbclient.execute(304            "select * from snapshot_policy where uuid='%s'" %305            recurring_snapshot_monthly.id)306        validation_result = validateList(snap_sch_3)307        self.assertEqual(308            validation_result[0],309            PASS,310            "snapshot_policy list validation failed due to %s" %311            validation_result[2])312        self.assertNotEqual(313            len(snap_sch_3),314            0,315            "Check DB Query result set"316        )317        recurring_snapshot_monthly.delete(self.apiclient)318        self.assertEqual(319            self.dbclient.execute(320                "select * from snapshot_policy where uuid='%s'" %321                recurring_snapshot_weekly.id),322            []323        )324        snapshots = list_snapshots(325            self.apiclient,326            volumeid=self.volume[0].id,327            intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],328            snapshottype='RECURRING',329            listall=True330        )331        # Step 3332        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'333        self.testdata["recurring_snapshot"]["schedule"] = 1334        recurring_snapshot_1 = SnapshotPolicy.create(335            self.apiclient,336            self.volume[0].id,337            self.testdata["recurring_snapshot"]338        )339        # ListSnapshotPolicy should return newly created policy340        list_snapshots_policy = list_snapshot_policy(341            self.apiclient,342            id=recurring_snapshot_1.id,343            volumeid=self.volume[0].id344        )345        list_validation = validateList(list_snapshots_policy)346        self.assertEqual(347            list_validation[0],348            PASS,349            "snapshot list validation failed due to %s" %350            list_validation[2])351        timeout = self.testdata["timeout"]352        while True:353            snapshots = list_snapshots(354                self.apiclient,355                volumeid=self.volume[0].id,356                intervaltype=self.testdata[357                    "recurring_snapshot"]["intervaltype"],358                snapshottype='RECURRING',359                listall=True360            )361            if isinstance(snapshots, list):362                break363            elif timeout == 0:364                raise Exception("List snapshots API call failed.")365        snap_to_delete = snapshots[0]366        time.sleep(367            (self.testdata["recurring_snapshot"]["maxsnaps"]) * 3600368        )369        snapshots_1 = list_snapshots(370            self.apiclient,371            volumeid=self.volume[0].id,372            intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],373            snapshottype='RECURRING',374            listall=True375        )376        self.assertTrue(snap_to_delete not in snapshots_1)377        time.sleep(360)378        self.assertEqual(379            self.dbclient.execute(380                "select status  from snapshots where uuid='%s'" %381                snap_to_delete.id)[0][0],382            "Destroyed"383        )384        self.assertFalse(385            is_snapshot_on_nfs(386                self.apiclient,387                self.dbclient,388                self.config,389                self.zone.id,390                snap_to_delete.id))391        # Step 4392        recurring_snapshot = SnapshotPolicy.create(393            self.apiclient,394            self.volume[0].id,395            self.testdata["recurring_snapshot"]396        )397        # ListSnapshotPolicy should return newly created policy398        list_snapshots_policy = list_snapshot_policy(399            self.apiclient,400            id=recurring_snapshot.id,401            volumeid=self.volume[0].id402        )403        list_validation = validateList(list_snapshots_policy)404        self.assertEqual(405            list_validation[0],406            PASS,407            "snapshot list validation failed due to %s" %408            list_validation[2])409        time.sleep(180)410        snap_time_hourly = self.dbclient.execute(411            "select scheduled_timestamp from \412            snapshot_schedule where uuid='%s'" %413            recurring_snapshot.id)414        self.debug("Timestamp for hourly snapshot %s" % snap_time_hourly)415        recurring_snapshot.delete(self.apiclient)416        self.testdata["recurring_snapshot"]["intervaltype"] = 'DAILY'417        self.testdata["recurring_snapshot"]["schedule"] = '00:00'418        recurring_snapshot_daily = SnapshotPolicy.create(419            self.apiclient,420            self.volume[0].id,421            self.testdata["recurring_snapshot"]422        )423        list_snapshots_policy_daily = list_snapshot_policy(424            self.apiclient,425            id=recurring_snapshot_daily.id,426            volumeid=self.volume[0].id427        )428        list_validation = validateList(list_snapshots_policy_daily)429        self.assertEqual(430            list_validation[0],431            PASS,432            "snapshot list validation failed due to %s" %433            list_validation[2])434        time.sleep(180)435        snap_time_daily = self.dbclient.execute(436            "select scheduled_timestamp from \437                    snapshot_schedule where uuid='%s'" %438            recurring_snapshot_daily.id)439        self.debug("Timestamp for daily snapshot %s" % snap_time_daily)440        recurring_snapshot_daily.delete(self.apiclient)441        self.testdata["recurring_snapshot"]["intervaltype"] = 'WEEKLY'442        self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'443        recurring_snapshot_weekly = SnapshotPolicy.create(444            self.apiclient,445            self.volume[0].id,446            self.testdata["recurring_snapshot"]447        )448        list_snapshots_policy_weekly = list_snapshot_policy(449            self.apiclient,450            id=recurring_snapshot_weekly.id,451            volumeid=self.volume[0].id452        )453        list_validation = validateList(list_snapshots_policy_weekly)454        self.assertEqual(455            list_validation[0],456            PASS,457            "snapshot list validation failed due to %s" %458            list_validation[2])459        time.sleep(180)460        snap_time_weekly = self.dbclient.execute(461            "select scheduled_timestamp from \462                    snapshot_schedule where uuid='%s'" %463            recurring_snapshot_weekly.id)464        self.debug("Timestamp for monthly snapshot %s" % snap_time_weekly)465        recurring_snapshot_weekly.delete(self.apiclient)466        self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'467        self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'468        recurring_snapshot_monthly = SnapshotPolicy.create(469            self.apiclient,470            self.volume[0].id,471            self.testdata["recurring_snapshot"]472        )473        list_snapshots_policy_monthly = list_snapshot_policy(474            self.apiclient,475            id=recurring_snapshot_monthly.id,476            volumeid=self.volume[0].id477        )478        list_validation = validateList(list_snapshots_policy_monthly)479        self.assertEqual(480            list_validation[0],481            PASS,482            "snapshot list validation failed due to %s" %483            list_validation[2])484        time.sleep(180)485        snap_time_monthly = self.dbclient.execute(486            "select scheduled_timestamp from \487                    snapshot_schedule where uuid='%s'" %488            recurring_snapshot_monthly.id)489        self.debug("Timestamp for monthly snapshot %s" % snap_time_monthly)490        recurring_snapshot_monthly.delete(self.apiclient)491        # Step 5492        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'493        self.testdata["recurring_snapshot"]["schedule"] = 1494        recurring_snapshot_hourly = SnapshotPolicy.create(495            self.apiclient,496            self.volume[0].id,497            self.testdata["recurring_snapshot"]498        )499        self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'500        self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'501        recurring_snapshot_monthly = SnapshotPolicy.create(502            self.apiclient,503            self.volume[0].id,504            self.testdata["recurring_snapshot"]505        )506        list_snapshots_policy = list_snapshot_policy(507            self.apiclient,508            volumeid=self.volume[0].id509        )510        list_validation = validateList(list_snapshots_policy)511        self.assertEqual(512            list_validation[0],513            PASS,514            "snapshot list validation failed due to %s" %515            list_validation[2])516        for rec in [recurring_snapshot_hourly, recurring_snapshot_monthly]:517            self.assertTrue(518                rec.id in any(519                    policy['id']) for policy in list_snapshots_policy)520        recurring_snapshot_hourly.delete(self.apiclient)521        recurring_snapshot_monthly.delete(self.apiclient)522        # Step 6523        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'524        self.testdata["recurring_snapshot"]["schedule"] = 1525        recurring_snapshot_hourly = SnapshotPolicy.create(526            self.apiclient,527            self.volume[0].id,528            self.testdata["recurring_snapshot"]529        )530        self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'531        self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'532        recurring_snapshot_monthly = SnapshotPolicy.create(533            self.apiclient,534            self.volume[0].id,535            self.testdata["recurring_snapshot"]536        )537        recurring_snapshot_monthly.delete(self.apiclient)538        list_snapshots_policy = list_snapshot_policy(539            self.apiclient,540            volumeid=self.volume[0].id541        )542        list_validation = validateList(list_snapshots_policy)543        self.assertEqual(544            list_validation[0],545            PASS,546            "snapshot list validation failed due to %s" %547            list_validation[2])548        self.assertTrue(549            recurring_snapshot_hourly.id in any(550                policy['id']) for policy in list_snapshots_policy)551        self.assertTrue(552            recurring_snapshot_monthly.id not in any(553                policy['id']) for policy in list_snapshots_policy)554        # Step 7555        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'556        self.testdata["recurring_snapshot"]["schedule"] = 1557        recurring_snapshot = SnapshotPolicy.create(558            self.apiclient,559            self.volume[0].id,560            self.testdata["recurring_snapshot"]561        )562        # ListSnapshotPolicy should return newly created policy563        list_snapshots_policy = list_snapshot_policy(564            self.apiclient,565            id=recurring_snapshot.id,566            volumeid=self.volume[0].id567        )568        list_validation = validateList(list_snapshots_policy)569        self.assertEqual(570            list_validation[0],571            PASS,572            "snapshot list validation failed due to %s" %573            list_validation[2])574        timeout = self.testdata["timeout"]575        while True:576            snapshots = list_snapshots(577                self.apiclient,578                volumeid=self.volume[0].id,579                intervaltype=self.testdata[580                    "recurring_snapshot"]["intervaltype"],581                snapshottype='RECURRING',582                listall=True583            )584            if isinstance(snapshots, list):585                break586            elif timeout == 0:587                raise Exception("List snapshots API call failed.")588        self.vm_1.delete(self.apiclient, expunge=False)589        time.sleep(3600)590        snapshot_list = Snapshot.list(591            self.apiclient,592            volumeid=self.volume[0].id593        )594        list_validation = validateList(snapshot_list)595        self.assertEqual(596            list_validation[0],597            PASS,598            "snapshot list validation failed due to %s" %599            list_validation[2])600        self.assertEqual(len(snapshot_list),601                         1,602                         "Verify that snapsot is not created after VM deletion"603                         )604        # Step 8605        self.vm_1.recover(self.apiclient)606        time.sleep(3600)607        snapshot_list = Snapshot.list(608            self.apiclient,609            volumeid=self.volume[0].id610        )611        self.assertEqual(len(snapshot_list),612                         2,613                         "Verify that snapsot is not created after VM deletion"614                         )615        # Step 9616        self.vm_1.delete(self.apiclient)617        time.sleep(180)618        with self.assertRaises(Exception):619            list_snapshots_policy = list_snapshot_policy(620                self.apiclient,621                volumeid=self.volume[0].id622            )623    @attr(tags=["advanced", "basic"], required_hardware="true")624    def test_02_volume_max_snapshot(self):625        """ Test Volume Snapshot626        # 1. Create Hourly reccuring snapshot policy with maxsnaps=2 627                verify that when 3rd snapshot is taken first snapshot gets deleted628        """629        if self.hypervisor.lower() not in ["kvm", "vmware"]:630            self.skipTest("Skip test for hypervisor other than KVM and VMWare")631        # Step 1632        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'633        self.testdata["recurring_snapshot"]["schedule"] = 1634        recurring_snapshot_1 = SnapshotPolicy.create(635            self.apiclient,636            self.volume[0].id,637            self.testdata["recurring_snapshot"]638        )639        # ListSnapshotPolicy should return newly created policy640        list_snapshots_policy = list_snapshot_policy(641            self.apiclient,642            id=recurring_snapshot_1.id,643            volumeid=self.volume[0].id644        )645        list_validation = validateList(list_snapshots_policy)646        self.assertEqual(647            list_validation[0],648            PASS,649            "snapshot list validation failed due to %s" %650            list_validation[2])651        timeout = self.testdata["timeout"]652        while True:653            snapshots = list_snapshots(654                self.apiclient,655                volumeid=self.volume[0].id,656                intervaltype=self.testdata[657                    "recurring_snapshot"]["intervaltype"],658                snapshottype='RECURRING',659                listall=True660            )661            if isinstance(snapshots, list):662                break663            elif timeout == 0:664                raise Exception("List snapshots API call failed.")665        snap_to_delete = snapshots[0]666        time.sleep(667            (self.testdata["recurring_snapshot"]["maxsnaps"]) * 3600668        )669        670        snapshots_1 = list_snapshots(671            self.apiclient,672            volumeid=self.volume[0].id,673            intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],674            snapshottype='RECURRING',675            listall=True676        )677        self.assertTrue(snap_to_delete not in snapshots_1)678        time.sleep(360)679        self.assertEqual(680            self.dbclient.execute(681                "select status  from snapshots where uuid='%s'" %682                snap_to_delete.id)[0][0],683            "Destroyed"684        )685        self.assertFalse(686            is_snapshot_on_nfs(687                self.apiclient,688                self.dbclient,689                self.config,690                self.zone.id,691                snap_to_delete.id))692        # DATA DISK693        recurring_snapshot_data = SnapshotPolicy.create(694            self.apiclient,695            self.data_volume[0].id,696            self.testdata["recurring_snapshot"]697        )698        # ListSnapshotPolicy should return newly created policy699        list_snapshots_policy = list_snapshot_policy(700            self.apiclient,701            id=recurring_snapshot_data.id,702            volumeid=self.data_volume[0].id703        )704        list_validation = validateList(list_snapshots_policy)705        self.assertEqual(706            list_validation[0],707            PASS,708            "snapshot list validation failed due to %s" %709            list_validation[2])710        timeout = self.testdata["timeout"]711        while True:712            snapshots = list_snapshots(713                self.apiclient,714                volumeid=self.volume[0].id,715                intervaltype=self.testdata[716                    "recurring_snapshot"]["intervaltype"],717                snapshottype='RECURRING',718                listall=True719            )720            if isinstance(snapshots, list):721                break722            elif timeout == 0:723                raise Exception("List snapshots API call failed.")724        data_snap_to_delete = snapshots[0]725        time.sleep(726            (self.testdata["recurring_snapshot"]["maxsnaps"]) * 3600727        )728        data_snapshots_1 = list_snapshots(729            self.apiclient,730            volumeid=self.volume[0].id,731            intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],732            snapshottype='RECURRING',733            listall=True734        )735        self.assertTrue(data_snap_to_delete not in data_snapshots_1)736        time.sleep(360)737        self.assertEqual(738            self.dbclient.execute(739                "select status  from snapshots where uuid='%s'" %740                snap_to_delete.id)[0][0],741            "Destroyed"742        )743        self.assertFalse(744            is_snapshot_on_nfs(745                self.apiclient,746                self.dbclient,747                self.config,748                self.zone.id,749                data_snap_to_delete.id))750    @attr(tags=["advanced", "basic"],required_hardware="true")751    def test_03_volume_rec_snapshot(self):752        """ Test Volume (root) Snapshot753        # 1. For snapshot.delta.max > maxsnaps verify that when number of snapshot exceeds 754                maxsnaps value previous snapshot should get deleted from database but remain 755                on secondary storage and when the value exceeds snapshot.delta.max the 756                snapshot should get deleted from secondary storage757        """758        if self.hypervisor.lower() != "xenserver":759            self.skipTest("Skip test for hypervisor other than Xenserver")760        # Step 1761        self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'762        self.testdata["recurring_snapshot"]["schedule"] = 1763        recurring_snapshot_root = SnapshotPolicy.create(764            self.apiclient,765            self.volume[0].id,...test_snapshots.py
Source:test_snapshots.py  
...191        self.assertEqual(len(resp_snapshots), 1)192        resp_snapshot = resp_snapshots.pop()193        self.assertEqual(resp_snapshot['id'], 123)194class SnapshotSerializerTest(test.TestCase):195    def _verify_snapshot(self, snap, tree):196        self.assertEqual(tree.tag, 'snapshot')197        for attr in ('id', 'status', 'size', 'createdAt',198                     'displayName', 'displayDescription', 'volumeId'):199            self.assertEqual(str(snap[attr]), tree.get(attr))200    def test_snapshot_show_create_serializer(self):201        serializer = volumes.SnapshotTemplate()202        raw_snapshot = dict(203            id='snap_id',204            status='snap_status',205            size=1024,206            createdAt=datetime.datetime.now(),207            displayName='snap_name',208            displayDescription='snap_desc',209            volumeId='vol_id',210            )211        text = serializer.serialize(dict(snapshot=raw_snapshot))212        print text213        tree = etree.fromstring(text)214        self._verify_snapshot(raw_snapshot, tree)215    def test_snapshot_index_detail_serializer(self):216        serializer = volumes.SnapshotsTemplate()217        raw_snapshots = [dict(218                id='snap1_id',219                status='snap1_status',220                size=1024,221                createdAt=datetime.datetime.now(),222                displayName='snap1_name',223                displayDescription='snap1_desc',224                volumeId='vol1_id',225                ),226                       dict(227                id='snap2_id',228                status='snap2_status',229                size=1024,230                createdAt=datetime.datetime.now(),231                displayName='snap2_name',232                displayDescription='snap2_desc',233                volumeId='vol2_id',234                )]235        text = serializer.serialize(dict(snapshots=raw_snapshots))236        print text237        tree = etree.fromstring(text)238        self.assertEqual('snapshots', tree.tag)239        self.assertEqual(len(raw_snapshots), len(tree))240        for idx, child in enumerate(tree):...ec2_snapshot.py
Source:ec2_snapshot.py  
...119    HAS_BOTO = False120# Find the most recent snapshot121def _get_snapshot_starttime(snap):122    return datetime.datetime.strptime(snap.start_time, '%Y-%m-%dT%H:%M:%S.000Z')123def _get_most_recent_snapshot(snapshots, max_snapshot_age_secs=None, now=None):124    """125    Gets the most recently created snapshot and optionally filters the result126    if the snapshot is too old127    :param snapshots: list of snapshots to search128    :param max_snapshot_age_secs: filter the result if its older than this129    :param now: simulate time -- used for unit testing130    :return:131    """132    if len(snapshots) == 0:133        return None134    if not now:135        now = datetime.datetime.utcnow()136    youngest_snapshot = max(snapshots, key=_get_snapshot_starttime)137    # See if the snapshot is younger that the given max age138    snapshot_start = datetime.datetime.strptime(youngest_snapshot.start_time, '%Y-%m-%dT%H:%M:%S.000Z')139    snapshot_age = now - snapshot_start140    if max_snapshot_age_secs is not None:141        if snapshot_age.total_seconds() > max_snapshot_age_secs:142            return None143    return youngest_snapshot144def _create_with_wait(snapshot, wait_timeout_secs, sleep_func=time.sleep):145    """146    Wait for the snapshot to be created147    :param snapshot:148    :param wait_timeout_secs: fail this step after this many seconds149    :param sleep_func:150    :return:151    """152    time_waited = 0153    snapshot.update()154    while snapshot.status != 'completed':155        sleep_func(3)156        snapshot.update()157        time_waited += 3158        if wait_timeout_secs and time_waited > wait_timeout_secs:159            return False160    return True161def create_snapshot(module, ec2, state=None, description=None, wait=None,162                    wait_timeout=None, volume_id=None, instance_id=None,163                    snapshot_id=None, device_name=None, snapshot_tags=None,164                    last_snapshot_min_age=None):165    snapshot = None166    changed = False167    required = [volume_id, snapshot_id, instance_id]168    if required.count(None) != len(required) - 1: # only 1 must be set169        module.fail_json(msg='One and only one of volume_id or instance_id or snapshot_id must be specified')170    if instance_id and not device_name or device_name and not instance_id:171        module.fail_json(msg='Instance ID and device name must both be specified')172    if instance_id:173        try:174            volumes = ec2.get_all_volumes(filters={'attachment.instance-id': instance_id, 'attachment.device': device_name})175        except boto.exception.BotoServerError as e:176            module.fail_json(msg = "%s: %s" % (e.error_code, e.error_message))177        if not volumes:178            module.fail_json(msg="Could not find volume with name %s attached to instance %s" % (device_name, instance_id))179        volume_id = volumes[0].id180    if state == 'absent':181        if not snapshot_id:182            module.fail_json(msg = 'snapshot_id must be set when state is absent')183        try:184            ec2.delete_snapshot(snapshot_id)185        except boto.exception.BotoServerError as e:186            # exception is raised if snapshot does not exist187            if e.error_code == 'InvalidSnapshot.NotFound':188                module.exit_json(changed=False)189            else:190                module.fail_json(msg = "%s: %s" % (e.error_code, e.error_message))191        # successful delete192        module.exit_json(changed=True)193    if last_snapshot_min_age > 0:194        try:195            current_snapshots = ec2.get_all_snapshots(filters={'volume_id': volume_id})196        except boto.exception.BotoServerError as e:197            module.fail_json(msg="%s: %s" % (e.error_code, e.error_message))198        last_snapshot_min_age = last_snapshot_min_age * 60 # Convert to seconds199        snapshot = _get_most_recent_snapshot(current_snapshots,200                                             max_snapshot_age_secs=last_snapshot_min_age)201    try:202        # Create a new snapshot if we didn't find an existing one to use203        if snapshot is None:204            snapshot = ec2.create_snapshot(volume_id, description=description)205            changed = True206        if wait:207            if not _create_with_wait(snapshot, wait_timeout):208                module.fail_json(msg='Timed out while creating snapshot.')209        if snapshot_tags:210            for k, v in snapshot_tags.items():211                snapshot.add_tag(k, v)212    except boto.exception.BotoServerError as e:213        module.fail_json(msg="%s: %s" % (e.error_code, e.error_message))214    module.exit_json(changed=changed,215                     snapshot_id=snapshot.id,216                     volume_id=snapshot.volume_id,217                     volume_size=snapshot.volume_size,218                     tags=snapshot.tags.copy())219def create_snapshot_ansible_module():220    argument_spec = ec2_argument_spec()221    argument_spec.update(222        dict(223            volume_id = dict(),224            description = dict(),225            instance_id = dict(),226            snapshot_id = dict(),227            device_name = dict(),228            wait = dict(type='bool', default=True),229            wait_timeout = dict(type='int', default=0),230            last_snapshot_min_age = dict(type='int', default=0),231            snapshot_tags = dict(type='dict', default=dict()),232            state = dict(choices=['absent','present'], default='present'),233        )234    )235    module = AnsibleModule(argument_spec=argument_spec)236    return module237def main():238    module = create_snapshot_ansible_module()239    if not HAS_BOTO:240        module.fail_json(msg='boto required for this module')241    volume_id = module.params.get('volume_id')242    snapshot_id = module.params.get('snapshot_id')243    description = module.params.get('description')244    instance_id = module.params.get('instance_id')245    device_name = module.params.get('device_name')246    wait = module.params.get('wait')247    wait_timeout = module.params.get('wait_timeout')248    last_snapshot_min_age = module.params.get('last_snapshot_min_age')249    snapshot_tags = module.params.get('snapshot_tags')250    state = module.params.get('state')251    ec2 = ec2_connect(module)252    create_snapshot(253        module=module,254        state=state,255        description=description,256        wait=wait,257        wait_timeout=wait_timeout,258        ec2=ec2,259        volume_id=volume_id,260        instance_id=instance_id,261        snapshot_id=snapshot_id,262        device_name=device_name,263        snapshot_tags=snapshot_tags,264        last_snapshot_min_age=last_snapshot_min_age265    )266# import module snippets...test_snapshot.py
Source:test_snapshot.py  
1#2#   Licensed under the Apache License, Version 2.0 (the "License"); you may3#   not use this file except in compliance with the License. You may obtain4#   a copy of the License at5#6#        http://www.apache.org/licenses/LICENSE-2.07#8#   Unless required by applicable law or agreed to in writing, software9#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT10#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the11#   License for the specific language governing permissions and limitations12#   under the License.13#14import copy15from openstackclient.tests import fakes16from openstackclient.tests.volume.v2 import fakes as volume_fakes17from openstackclient.volume.v2 import snapshot18class TestSnapshot(volume_fakes.TestVolume):19    def setUp(self):20        super(TestSnapshot, self).setUp()21        self.snapshots_mock = self.app.client_manager.volume.volume_snapshots22        self.snapshots_mock.reset_mock()23        self.volumes_mock = self.app.client_manager.volume.volumes24        self.volumes_mock.reset_mock()25class TestSnapshotCreate(TestSnapshot):26    def setUp(self):27        super(TestSnapshotCreate, self).setUp()28        self.volumes_mock.get.return_value = fakes.FakeResource(29            None,30            copy.deepcopy(volume_fakes.VOLUME),31            loaded=True32        )33        self.snapshots_mock.create.return_value = fakes.FakeResource(34            None,35            copy.deepcopy(volume_fakes.SNAPSHOT),36            loaded=True37        )38        # Get the command object to test39        self.cmd = snapshot.CreateSnapshot(self.app, None)40    def test_snapshot_create(self):41        arglist = [42            volume_fakes.volume_id,43            "--name", volume_fakes.snapshot_name,44            "--description", volume_fakes.snapshot_description,45            "--force"46        ]47        verifylist = [48            ("volume", volume_fakes.volume_id),49            ("name", volume_fakes.snapshot_name),50            ("description", volume_fakes.snapshot_description),51            ("force", True)52        ]53        parsed_args = self.check_parser(self.cmd, arglist, verifylist)54        columns, data = self.cmd.take_action(parsed_args)55        self.snapshots_mock.create.assert_called_with(56            volume_fakes.volume_id,57            force=True,58            name=volume_fakes.snapshot_name,59            description=volume_fakes.snapshot_description60        )61        self.assertEqual(columns, volume_fakes.SNAPSHOT_columns)62        self.assertEqual(data, volume_fakes.SNAPSHOT_data)63class TestSnapshotDelete(TestSnapshot):64    def setUp(self):65        super(TestSnapshotDelete, self).setUp()66        self.snapshots_mock.get.return_value = fakes.FakeResource(67            None,68            copy.deepcopy(volume_fakes.SNAPSHOT),69            loaded=True)70        self.snapshots_mock.delete.return_value = None71        # Get the command object to mock72        self.cmd = snapshot.DeleteSnapshot(self.app, None)73    def test_snapshot_delete(self):74        arglist = [75            volume_fakes.snapshot_id76        ]77        verifylist = [78            ("snapshots", [volume_fakes.snapshot_id])79        ]80        parsed_args = self.check_parser(self.cmd, arglist, verifylist)81        result = self.cmd.take_action(parsed_args)82        self.snapshots_mock.delete.assert_called_with(volume_fakes.snapshot_id)83        self.assertIsNone(result)84class TestSnapshotList(TestSnapshot):85    columns = [86        "ID",87        "Name",88        "Description",89        "Status",90        "Size"91    ]92    def setUp(self):93        super(TestSnapshotList, self).setUp()94        self.volumes_mock.list.return_value = [95            fakes.FakeResource(96                None,97                copy.deepcopy(volume_fakes.VOLUME),98                loaded=True99            )100        ]101        self.snapshots_mock.list.return_value = [102            fakes.FakeResource(103                None,104                copy.deepcopy(volume_fakes.SNAPSHOT),105                loaded=True106            )107        ]108        # Get the command to test109        self.cmd = snapshot.ListSnapshot(self.app, None)110    def test_snapshot_list_without_options(self):111        arglist = []112        verifylist = [113            ('all_projects', False),114            ("long", False)115        ]116        parsed_args = self.check_parser(self.cmd, arglist, verifylist)117        columns, data = self.cmd.take_action(parsed_args)118        self.assertEqual(self.columns, columns)119        datalist = ((120            volume_fakes.snapshot_id,121            volume_fakes.snapshot_name,122            volume_fakes.snapshot_description,123            "available",124            volume_fakes.snapshot_size125        ),)126        self.assertEqual(datalist, tuple(data))127    def test_snapshot_list_with_options(self):128        arglist = ["--long"]129        verifylist = [("long", True), ('all_projects', False)]130        parsed_args = self.check_parser(self.cmd, arglist, verifylist)131        columns, data = self.cmd.take_action(parsed_args)132        columns = self.columns + [133            "Created At",134            "Volume",135            "Properties"136        ]137        self.assertEqual(columns, columns)138        datalist = ((139            volume_fakes.snapshot_id,140            volume_fakes.snapshot_name,141            volume_fakes.snapshot_description,142            "available",143            volume_fakes.snapshot_size,144            "2015-06-03T18:49:19.000000",145            volume_fakes.volume_name,146            volume_fakes.EXPECTED_SNAPSHOT.get("properties")147        ),)148        self.assertEqual(datalist, tuple(data))149    def test_snapshot_list_all_projects(self):150        arglist = [151            '--all-projects',152        ]153        verifylist = [154            ('long', False),155            ('all_projects', True)156        ]157        parsed_args = self.check_parser(self.cmd, arglist, verifylist)158        columns, data = self.cmd.take_action(parsed_args)159        self.assertEqual(self.columns, columns)160        datalist = ((161            volume_fakes.snapshot_id,162            volume_fakes.snapshot_name,163            volume_fakes.snapshot_description,164            "available",165            volume_fakes.snapshot_size166        ), )167        self.assertEqual(datalist, tuple(data))168class TestSnapshotSet(TestSnapshot):169    def setUp(self):170        super(TestSnapshotSet, self).setUp()171        self.snapshots_mock.get.return_value = fakes.FakeResource(172            None,173            copy.deepcopy(volume_fakes.SNAPSHOT),174            loaded=True175        )176        self.snapshots_mock.set_metadata.return_value = None177        self.snapshots_mock.update.return_value = None178        # Get the command object to mock179        self.cmd = snapshot.SetSnapshot(self.app, None)180    def test_snapshot_set(self):181        arglist = [182            volume_fakes.snapshot_id,183            "--name", "new_snapshot",184            "--property", "x=y",185            "--property", "foo=foo"186        ]187        new_property = {"x": "y", "foo": "foo"}188        verifylist = [189            ("snapshot", volume_fakes.snapshot_id),190            ("name", "new_snapshot"),191            ("property", new_property)192        ]193        parsed_args = self.check_parser(self.cmd, arglist, verifylist)194        result = self.cmd.take_action(parsed_args)195        kwargs = {196            "name": "new_snapshot",197        }198        self.snapshots_mock.update.assert_called_with(199            volume_fakes.snapshot_id, **kwargs)200        self.snapshots_mock.set_metadata.assert_called_with(201            volume_fakes.snapshot_id, new_property202        )203        self.assertIsNone(result)204class TestSnapshotShow(TestSnapshot):205    def setUp(self):206        super(TestSnapshotShow, self).setUp()207        self.snapshots_mock.get.return_value = fakes.FakeResource(208            None,209            copy.deepcopy(volume_fakes.SNAPSHOT),210            loaded=True)211        # Get the command object to test212        self.cmd = snapshot.ShowSnapshot(self.app, None)213    def test_snapshot_show(self):214        arglist = [215            volume_fakes.snapshot_id216        ]217        verifylist = [218            ("snapshot", volume_fakes.snapshot_id)219        ]220        parsed_args = self.check_parser(self.cmd, arglist, verifylist)221        columns, data = self.cmd.take_action(parsed_args)222        self.snapshots_mock.get.assert_called_with(volume_fakes.snapshot_id)223        self.assertEqual(volume_fakes.SNAPSHOT_columns, columns)224        self.assertEqual(volume_fakes.SNAPSHOT_data, data)225class TestSnapshotUnset(TestSnapshot):226    def setUp(self):227        super(TestSnapshotUnset, self).setUp()228        self.snapshots_mock.get.return_value = fakes.FakeResource(229            None,230            copy.deepcopy(volume_fakes.SNAPSHOT),231            loaded=True232        )233        self.snapshots_mock.delete_metadata.return_value = None234        # Get the command object to mock235        self.cmd = snapshot.UnsetSnapshot(self.app, None)236    def test_snapshot_unset(self):237        arglist = [238            volume_fakes.snapshot_id,239            "--property", "foo"240        ]241        verifylist = [242            ("snapshot", volume_fakes.snapshot_id),243            ("property", ["foo"])244        ]245        parsed_args = self.check_parser(self.cmd, arglist, verifylist)246        result = self.cmd.take_action(parsed_args)247        self.snapshots_mock.delete_metadata.assert_called_with(248            volume_fakes.snapshot_id, ["foo"]249        )...LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!
