Best Python code snippet using tempest_python
test_session_recording_list.py
Source:test_session_recording_list.py  
...27                team = self.team28            event_factory(29                team=team, event=event_name, timestamp=timestamp, distinct_id=distinct_id, properties=properties,30            )31        def create_snapshot(32            self, distinct_id, session_id, timestamp, window_id="", team_id=None, has_full_snapshot=True33        ):34            if team_id is None:35                team_id = self.team.pk36            session_recording_event_factory(37                team_id=team_id,38                distinct_id=distinct_id,39                timestamp=timestamp,40                session_id=session_id,41                window_id=window_id,42                snapshot_data={"timestamp": timestamp.timestamp(), "has_full_snapshot": has_full_snapshot,},43            )44        @property45        def base_time(self):46            return now() - relativedelta(hours=1)47        @test_with_materialized_columns(["$current_url"])48        @freeze_time("2021-01-21T20:00:00.000Z")49        def test_basic_query(self):50            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})51            self.create_snapshot("user", "1", self.base_time)52            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=10))53            self.create_snapshot("user", "2", self.base_time + relativedelta(seconds=20))54            filter = SessionRecordingsFilter(team=self.team, data={"no_filter": None})55            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)56            (session_recordings, more_recordings_available) = session_recording_list_instance.run()57            self.assertEqual(len(session_recordings), 2)58            self.assertEqual(session_recordings[0]["start_time"], self.base_time + relativedelta(seconds=20))59            self.assertEqual(session_recordings[0]["session_id"], "2")60            self.assertEqual(session_recordings[0]["distinct_id"], "user")61            self.assertEqual(session_recordings[1]["start_time"], self.base_time)62            self.assertEqual(session_recordings[1]["session_id"], "1")63            self.assertEqual(session_recordings[1]["distinct_id"], "user")64            self.assertEqual(more_recordings_available, False)65        @freeze_time("2021-01-21T20:00:00.000Z")66        def test_recordings_dont_leak_data_between_teams(self):67            another_team = Team.objects.create(organization=self.organization)68            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})69            Person.objects.create(team=another_team, distinct_ids=["user"], properties={"email": "bla"})70            self.create_snapshot("user", "1", self.base_time, team_id=another_team.pk)71            self.create_snapshot("user", "2", self.base_time)72            filter = SessionRecordingsFilter(team=self.team, data={"no_filter": None})73            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)74            (session_recordings, _) = session_recording_list_instance.run()75            self.assertEqual(len(session_recordings), 1)76            self.assertEqual(session_recordings[0]["session_id"], "2")77            self.assertEqual(session_recordings[0]["distinct_id"], "user")78        @freeze_time("2021-01-21T20:00:00.000Z")79        def test_all_sessions_recording_object_keys(self):80            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})81            self.create_snapshot("user", "1", self.base_time)82            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))83            filter = SessionRecordingsFilter(team=self.team, data={"no_filter": None})84            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)85            (session_recordings, _) = session_recording_list_instance.run()86            self.assertEqual(len(session_recordings), 1)87            self.assertEqual(session_recordings[0]["session_id"], "1")88            self.assertEqual(session_recordings[0]["distinct_id"], "user")89            self.assertEqual(session_recordings[0]["start_time"], self.base_time)90            self.assertEqual(session_recordings[0]["end_time"], self.base_time + relativedelta(seconds=30))91            self.assertEqual(session_recordings[0]["duration"], 30)92        @freeze_time("2021-01-21T20:00:00.000Z")93        def test_event_filter(self):94            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})95            self.create_snapshot("user", "1", self.base_time)96            self.create_event("user", self.base_time)97            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))98            filter = SessionRecordingsFilter(99                team=self.team,100                data={"events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}]},101            )102            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)103            (session_recordings, _) = session_recording_list_instance.run()104            self.assertEqual(len(session_recordings), 1)105            self.assertEqual(session_recordings[0]["session_id"], "1")106            filter = SessionRecordingsFilter(107                team=self.team,108                data={"events": [{"id": "$autocapture", "type": "events", "order": 0, "name": "$autocapture"}]},109            )110            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)111            (session_recordings, _) = session_recording_list_instance.run()112            self.assertEqual(len(session_recordings), 0)113        @test_with_materialized_columns(["$current_url", "$browser"])114        @freeze_time("2021-01-21T20:00:00.000Z")115        def test_event_filter_with_properties(self):116            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})117            self.create_snapshot("user", "1", self.base_time)118            self.create_event("user", self.base_time, properties={"$browser": "Chrome"})119            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))120            filter = SessionRecordingsFilter(121                team=self.team,122                data={123                    "events": [124                        {125                            "id": "$pageview",126                            "type": "events",127                            "order": 0,128                            "name": "$pageview",129                            "properties": [130                                {"key": "$browser", "value": ["Chrome"], "operator": "exact", "type": "event"}131                            ],132                        },133                    ]134                },135            )136            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)137            (session_recordings, _) = session_recording_list_instance.run()138            self.assertEqual(len(session_recordings), 1)139            self.assertEqual(session_recordings[0]["session_id"], "1")140            filter = SessionRecordingsFilter(141                team=self.team,142                data={143                    "events": [144                        {145                            "id": "$pageview",146                            "type": "events",147                            "order": 0,148                            "name": "$pageview",149                            "properties": [150                                {"key": "$browser", "value": ["Firefox"], "operator": "exact", "type": "event"}151                            ],152                        },153                    ]154                },155            )156            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)157            (session_recordings, _) = session_recording_list_instance.run()158            self.assertEqual(len(session_recordings), 0)159        @freeze_time("2021-01-21T20:00:00.000Z")160        def test_multiple_event_filters(self):161            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})162            self.create_snapshot("user", "1", self.base_time)163            self.create_event("user", self.base_time)164            self.create_event("user", self.base_time, event_name="new-event")165            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))166            filter = SessionRecordingsFilter(167                team=self.team,168                data={169                    "events": [170                        {"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"},171                        {"id": "new-event", "type": "events", "order": 0, "name": "new-event"},172                    ]173                },174            )175            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)176            (session_recordings, _) = session_recording_list_instance.run()177            self.assertEqual(len(session_recordings), 1)178            self.assertEqual(session_recordings[0]["session_id"], "1")179            filter = SessionRecordingsFilter(180                team=self.team,181                data={182                    "events": [183                        {"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"},184                        {"id": "new-event2", "type": "events", "order": 0, "name": "new-event2"},185                    ]186                },187            )188            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)189            (session_recordings, _) = session_recording_list_instance.run()190            self.assertEqual(len(session_recordings), 0)191        @test_with_materialized_columns(["$current_url", "$browser"])192        @freeze_time("2021-01-21T20:00:00.000Z")193        def test_action_filter(self):194            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})195            action1 = self.create_action("custom-event", properties=[{"key": "$browser", "value": "Firefox"}])196            action2 = self.create_action(name="custom-event")197            self.create_snapshot("user", "1", self.base_time)198            self.create_event("user", self.base_time, event_name="custom-event", properties={"$browser": "Chrome"})199            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))200            # An action with properties201            filter = SessionRecordingsFilter(202                team=self.team,203                data={"actions": [{"id": action1.id, "type": "actions", "order": 1, "name": "custom-event",}]},204            )205            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)206            (session_recordings, _) = session_recording_list_instance.run()207            self.assertEqual(len(session_recordings), 0)208            # An action without properties209            filter = SessionRecordingsFilter(210                team=self.team,211                data={"actions": [{"id": action2.id, "type": "actions", "order": 1, "name": "custom-event",}]},212            )213            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)214            (session_recordings, _) = session_recording_list_instance.run()215            self.assertEqual(len(session_recordings), 1)216            self.assertEqual(session_recordings[0]["session_id"], "1")217            # Adding properties to an action218            filter = SessionRecordingsFilter(219                team=self.team,220                data={221                    "actions": [222                        {223                            "id": action2.id,224                            "type": "actions",225                            "order": 1,226                            "name": "custom-event",227                            "properties": [228                                {"key": "$browser", "value": ["Firefox"], "operator": "exact", "type": "event"}229                            ],230                        }231                    ]232                },233            )234            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)235            (session_recordings, _) = session_recording_list_instance.run()236            self.assertEqual(len(session_recordings), 0)237        @freeze_time("2021-01-21T20:00:00.000Z")238        def test_all_sessions_recording_object_keys_with_entity_filter(self):239            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})240            self.create_snapshot("user", "1", self.base_time)241            self.create_event("user", self.base_time)242            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))243            filter = SessionRecordingsFilter(244                team=self.team,245                data={"events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}]},246            )247            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)248            (session_recordings, _) = session_recording_list_instance.run()249            self.assertEqual(len(session_recordings), 1)250            self.assertEqual(session_recordings[0]["session_id"], "1")251            self.assertEqual(session_recordings[0]["distinct_id"], "user")252            self.assertEqual(session_recordings[0]["start_time"], self.base_time)253            self.assertEqual(session_recordings[0]["end_time"], self.base_time + relativedelta(seconds=30))254            self.assertEqual(session_recordings[0]["duration"], 30)255        @freeze_time("2021-01-21T20:00:00.000Z")256        def test_duration_filter(self):257            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})258            self.create_snapshot("user", "1", self.base_time)259            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))260            self.create_snapshot("user", "2", self.base_time)261            self.create_snapshot("user", "2", self.base_time + relativedelta(minutes=4))262            filter = SessionRecordingsFilter(263                team=self.team,264                data={"session_recording_duration": '{"type":"recording","key":"duration","value":60,"operator":"gt"}'},265            )266            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)267            (session_recordings, _) = session_recording_list_instance.run()268            self.assertEqual(len(session_recordings), 1)269            self.assertEqual(session_recordings[0]["session_id"], "2")270            filter = SessionRecordingsFilter(271                team=self.team,272                data={"session_recording_duration": '{"type":"recording","key":"duration","value":60,"operator":"lt"}'},273            )274            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)275            (session_recordings, _) = session_recording_list_instance.run()276            self.assertEqual(len(session_recordings), 1)277            self.assertEqual(session_recordings[0]["session_id"], "1")278        @freeze_time("2021-01-21T20:00:00.000Z")279        def test_date_from_filter(self):280            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})281            self.create_snapshot("user", "1", self.base_time - relativedelta(days=3))282            self.create_snapshot("user", "1", self.base_time - relativedelta(days=3) + relativedelta(seconds=30))283            filter = SessionRecordingsFilter(team=self.team, data={"date_from": self.base_time.strftime("%Y-%m-%d")})284            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)285            (session_recordings, _) = session_recording_list_instance.run()286            self.assertEqual(len(session_recordings), 0)287            filter = SessionRecordingsFilter(288                team=self.team, data={"date_from": (self.base_time - relativedelta(days=4)).strftime("%Y-%m-%d")}289            )290            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)291            (session_recordings, _) = session_recording_list_instance.run()292            self.assertEqual(len(session_recordings), 1)293            self.assertEqual(session_recordings[0]["session_id"], "1")294        @freeze_time("2021-01-21T20:00:00.000Z")295        def test_date_to_filter(self):296            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})297            self.create_snapshot("user", "1", self.base_time - relativedelta(days=3))298            self.create_snapshot("user", "1", self.base_time - relativedelta(days=3) + relativedelta(seconds=30))299            filter = SessionRecordingsFilter(300                team=self.team, data={"date_to": (self.base_time - relativedelta(days=4)).strftime("%Y-%m-%d")}301            )302            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)303            (session_recordings, _) = session_recording_list_instance.run()304            self.assertEqual(len(session_recordings), 0)305            filter = SessionRecordingsFilter(team=self.team, data={"date_to": (self.base_time).strftime("%Y-%m-%d")})306            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)307            (session_recordings, _) = session_recording_list_instance.run()308            self.assertEqual(len(session_recordings), 1)309            self.assertEqual(session_recordings[0]["session_id"], "1")310        @freeze_time("2021-01-21T20:00:00.000Z")311        def test_recording_that_spans_time_bounds(self):312            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})313            day_line = datetime(2021, 11, 5)314            self.create_snapshot("user", "1", day_line - relativedelta(hours=3))315            self.create_snapshot("user", "1", day_line + relativedelta(hours=3))316            filter = SessionRecordingsFilter(317                team=self.team,318                data={319                    "date_to": day_line.strftime("%Y-%m-%d"),320                    "date_from": (day_line - relativedelta(days=10)).strftime("%Y-%m-%d"),321                },322            )323            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)324            (session_recordings, _) = session_recording_list_instance.run()325            self.assertEqual(len(session_recordings), 1)326            self.assertEqual(session_recordings[0]["session_id"], "1")327            self.assertEqual(session_recordings[0]["duration"], 6 * 60 * 60)328        @freeze_time("2021-01-21T20:00:00.000Z")329        def test_person_id_filter(self):330            p = Person.objects.create(team=self.team, distinct_ids=["user", "user2"], properties={"email": "bla"})331            self.create_snapshot("user", "1", self.base_time)332            self.create_snapshot("user2", "2", self.base_time + relativedelta(seconds=10))333            self.create_snapshot("user3", "3", self.base_time + relativedelta(seconds=20))334            filter = SessionRecordingsFilter(team=self.team, data={"person_uuid": str(p.uuid),})335            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)336            (session_recordings, _) = session_recording_list_instance.run()337            self.assertEqual(len(session_recordings), 2)338            self.assertEqual(session_recordings[0]["session_id"], "2")339            self.assertEqual(session_recordings[1]["session_id"], "1")340        @freeze_time("2021-01-21T20:00:00.000Z")341        def test_all_filters_at_once(self):342            p = Person.objects.create(team=self.team, distinct_ids=["user", "user2"], properties={"email": "bla"})343            action2 = self.create_action(name="custom-event")344            self.create_snapshot("user", "1", self.base_time - relativedelta(days=3))345            self.create_event("user", self.base_time - relativedelta(days=3))346            self.create_event(347                "user",348                self.base_time - relativedelta(days=3),349                event_name="custom-event",350                properties={"$browser": "Chrome"},351            )352            self.create_snapshot("user", "1", self.base_time - relativedelta(days=3) + relativedelta(hours=6))353            filter = SessionRecordingsFilter(354                team=self.team,355                data={356                    "person_uuid": str(p.uuid),357                    "date_to": (self.base_time + relativedelta(days=3)).strftime("%Y-%m-%d"),358                    "date_from": (self.base_time - relativedelta(days=10)).strftime("%Y-%m-%d"),359                    "session_recording_duration": '{"type":"recording","key":"duration","value":60,"operator":"gt"}',360                    "events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}],361                    "actions": [{"id": action2.id, "type": "actions", "order": 1, "name": "custom-event",}],362                },363            )364            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)365            (session_recordings, _) = session_recording_list_instance.run()366            self.assertEqual(len(session_recordings), 1)367            self.assertEqual(session_recordings[0]["session_id"], "1")368        @freeze_time("2021-01-21T20:00:00.000Z")369        def test_pagination(self):370            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})371            self.create_snapshot("user", "1", self.base_time)372            self.create_snapshot("user", "2", self.base_time + relativedelta(seconds=10))373            self.create_snapshot("user", "3", self.base_time + relativedelta(seconds=20))374            filter = SessionRecordingsFilter(team=self.team, data={"limit": 2,})375            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)376            (session_recordings, more_recordings_available) = session_recording_list_instance.run()377            self.assertEqual(len(session_recordings), 2)378            self.assertEqual(session_recordings[0]["session_id"], "3")379            self.assertEqual(session_recordings[1]["session_id"], "2")380            self.assertEqual(more_recordings_available, True)381            filter = SessionRecordingsFilter(team=self.team, data={"limit": 2, "offset": 0})382            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)383            (session_recordings, more_recordings_available) = session_recording_list_instance.run()384            self.assertEqual(len(session_recordings), 2)385            self.assertEqual(session_recordings[0]["session_id"], "3")386            self.assertEqual(session_recordings[1]["session_id"], "2")387            self.assertEqual(more_recordings_available, True)388            filter = SessionRecordingsFilter(team=self.team, data={"limit": 2, "offset": 1})389            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)390            (session_recordings, more_recordings_available) = session_recording_list_instance.run()391            self.assertEqual(len(session_recordings), 2)392            self.assertEqual(session_recordings[0]["session_id"], "2")393            self.assertEqual(session_recordings[1]["session_id"], "1")394            self.assertEqual(more_recordings_available, False)395            filter = SessionRecordingsFilter(team=self.team, data={"limit": 2, "offset": 2})396            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)397            (session_recordings, more_recordings_available) = session_recording_list_instance.run()398            self.assertEqual(len(session_recordings), 1)399            self.assertEqual(session_recordings[0]["session_id"], "1")400            self.assertEqual(more_recordings_available, False)401        @freeze_time("2021-01-21T20:00:00.000Z")402        def test_recording_without_fullsnapshot_dont_appear(self):403            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})404            self.create_snapshot("user", "1", self.base_time, has_full_snapshot=False)405            filter = SessionRecordingsFilter(team=self.team, data={"no-filter": True})406            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)407            (session_recordings, _) = session_recording_list_instance.run()408            self.assertEqual(len(session_recordings), 0)409        @freeze_time("2021-01-21T20:00:00.000Z")410        def test_teams_dont_leak_event_filter(self):411            Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})412            another_team = Team.objects.create(organization=self.organization)413            self.create_snapshot("user", "1", self.base_time)414            self.create_event(1, self.base_time + relativedelta(seconds=15), team=another_team)415            self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))416            filter = SessionRecordingsFilter(417                team=self.team,418                data={"events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}]},419            )420            session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)421            (session_recordings, _) = session_recording_list_instance.run()422            self.assertEqual(len(session_recordings), 0)...test_dirsnapshot.py
Source:test_dirsnapshot.py  
...87    d1 = tmp_path / "dir1"88    d1.mkdir()89    snapshot_file = tmp_path / f"{d1}.snapshot"90    populate_dir(d1)91    create_snapshot(str(d1), str(snapshot_file))92    assert is_snapshot_file(snapshot_file)93    snapshot_file = tmp_path / "not_a_snapshot"94    snapshot_file.touch()95    assert not is_snapshot_file(snapshot_file)96def test_create_snapshot_in_memory(tmp_path: pathlib.Path):97    """Test create_snapshot with in-memory snapshot"""98    d1 = tmp_path / "dir1"99    d1.mkdir()100    populate_dir(d1)101    snapshot = create_snapshot(str(d1), None)102    assert type(snapshot) == DirSnapshot103def test_dirdiff_two_snapshots(tmp_path: pathlib.Path, capsys):104    """Test DirDiff with two snapshots"""105    d1 = tmp_path / "dir1"106    d1.mkdir()107    files = populate_dir(d1)108    # snapshot 1109    snapshot_file1 = tmp_path / "1.snapshot"110    snapshot_1 = create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")111    assert snapshot_1.description == "snapshot-1"112    # create some changes113    added, removed, modified = modify_files(d1)114    # snapshot 2115    snapshot_file2 = tmp_path / "2.snapshot"116    snapshot_2 = create_snapshot(str(d1), str(snapshot_file2), description="snapshot-2")117    assert snapshot_2.description == "snapshot-2"118    dirdiff = DirDiff(119        str(snapshot_file1),120        str(snapshot_file2),121    )122    diff = dirdiff.diff()123    assert isinstance(diff, DirDiffResults)124    assert sorted(diff.removed) == removed125    assert sorted(diff.added) == added126    assert sorted(diff.modified) == modified127    # remove files we changed/removed from files list for comparing identical128    for f in removed + modified:129        files.remove(f)130    assert sorted(diff.identical) == sorted(files)131    # test json132    json_dict = json.loads(dirdiff.diff().json())133    assert sorted(json_dict["added"]) == sorted(diff.added)134    assert sorted(json_dict["removed"]) == sorted(diff.removed)135    assert sorted(json_dict["modified"]) == sorted(diff.modified)136    assert sorted(json_dict["identical"]) == sorted(diff.identical)137    # test description138    dirdiff.report()139    output = capsys.readouterr().out.strip()140    assert "(snapshot-1)" in output141    assert "(snapshot-2)" in output142def test_dirdiff_two_snapshot_objects(tmp_path: pathlib.Path, capsys):143    """Test DirDiff with two snapshot objects"""144    d1 = tmp_path / "dir1"145    d1.mkdir()146    files = populate_dir(d1)147    # snapshot 1148    snapshot_file1 = tmp_path / "1.snapshot"149    snapshot_1 = create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")150    assert snapshot_1.description == "snapshot-1"151    added, removed, modified = modify_files(d1)152    # snapshot 2153    snapshot_file2 = tmp_path / "2.snapshot"154    snapshot_2 = create_snapshot(str(d1), str(snapshot_file2), description="snapshot-2")155    assert snapshot_2.description == "snapshot-2"156    dirdiff = DirDiff(snapshot_1, snapshot_2)157    diff = dirdiff.diff()158    assert isinstance(diff, DirDiffResults)159    assert sorted(diff.removed) == removed160    assert sorted(diff.added) == added161    assert sorted(diff.modified) == modified162    # remove files we changed/removed from files list for comparing identical163    for f in removed + modified:164        files.remove(f)165    assert sorted(diff.identical) == sorted(files)166    # test json167    json_dict = json.loads(dirdiff.diff().json())168    assert sorted(json_dict["added"]) == sorted(diff.added)169    assert sorted(json_dict["removed"]) == sorted(diff.removed)170    assert sorted(json_dict["modified"]) == sorted(diff.modified)171    assert sorted(json_dict["identical"]) == sorted(diff.identical)172    # test description173    dirdiff.report()174    output = capsys.readouterr().out.strip()175    assert "(snapshot-1)" in output176    assert "(snapshot-2)" in output177def test_dirdiff_snapshot_dir(tmp_path: pathlib.Path):178    """Test DirDiff with snapshot and directory"""179    d1 = tmp_path / "dir1"180    d1.mkdir()181    files = populate_dir(d1)182    # snapshot 1183    snapshot_file1 = tmp_path / "1.snapshot"184    create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")185    added, removed, modified = modify_files(d1)186    dirdiff = DirDiff(187        str(snapshot_file1),188        str(d1),189    )190    diff = dirdiff.diff()191    assert isinstance(diff, DirDiffResults)192    assert sorted(diff.removed) == removed193    assert sorted(diff.added) == added194    assert sorted(diff.modified) == modified195    # remove files we changed/removed from files list for comparing identical196    for f in removed + modified:197        files.remove(f)198    assert sorted(diff.identical) == sorted(files)199def test_dirdiff_diff_dirs(tmp_path: pathlib.Path):200    """Test DirDiff with dirs=False"""201    d1 = tmp_path / "dir1"202    d1.mkdir()203    files = populate_dir(d1)204    # snapshot 1205    snapshot_file1 = tmp_path / "1.snapshot"206    create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")207    added, removed, modified = modify_files(d1)208    dirdiff = DirDiff(209        str(snapshot_file1),210        str(d1),211    )212    diff = dirdiff.diff(dirs=False)213    assert sorted(diff.removed) == removed214    assert sorted(diff.added) == added215    # filter out directories216    modified = [f for f in modified if pathlib.Path(f).name not in ["dir_0", "dir_1"]]217    assert sorted(diff.modified) == modified218    # remove files we changed/removed from files list for comparing identical219    for f in removed + modified:220        files.remove(f)221    # filter out directories222    files = [f for f in files if pathlib.Path(f).name not in ["dir_0", "dir_1"]]223    assert sorted(diff.identical) == sorted(files)224def test_dirdiff_filter_function_two_snapshots(tmp_path: pathlib.Path):225    """Test DirDiff with filter function and two snapshots"""226    d1 = tmp_path / "dir1"227    d1.mkdir()228    files = populate_dir(d1)229    # snapshot 1230    snapshot_file1 = tmp_path / "1.snapshot"231    create_snapshot(str(d1), str(snapshot_file1))232    added, removed, modified = modify_files(d1)233    # snapshot 2234    snapshot_file2 = tmp_path / "2.snapshot"235    create_snapshot(str(d1), str(snapshot_file2))236    def filter_function(path):237        if path.name == "file_1":238            return False239        return True240    # remove file_1 as the filter function will ignore it241    modified.remove(str(d1 / "file_1"))242    files.remove(str(d1 / "file_1"))243    dirdiff = DirDiff(244        str(snapshot_file1),245        str(snapshot_file2),246        filter_function=filter_function,247    )248    diff = dirdiff.diff()249    assert isinstance(diff, DirDiffResults)250    assert sorted(diff.removed) == removed251    assert sorted(diff.added) == added252    assert sorted(diff.modified) == modified253    # remove files we changed/removed from files list for comparing identical254    for f in removed + modified:255        files.remove(f)256    assert sorted(diff.identical) == sorted(files)257def test_dirdiff_filter_function_snapshot_dir(tmp_path: pathlib.Path):258    """Test DirDiff with filter function and snapshot and directory"""259    d1 = tmp_path / "dir1"260    d1.mkdir()261    files = populate_dir(d1)262    # snapshot 1263    snapshot_file1 = tmp_path / "1.snapshot"264    create_snapshot(str(d1), str(snapshot_file1))265    added, removed, modified = modify_files(d1)266    def filter_function(path):267        if path.name == "file_1":268            return False269        return True270    # remove file_1 as the filter function will ignore it271    modified.remove(str(d1 / "file_1"))272    files.remove(str(d1 / "file_1"))273    dirdiff = DirDiff(274        str(snapshot_file1),275        str(d1),276        filter_function=filter_function,277    )278    diff = dirdiff.diff()279    assert isinstance(diff, DirDiffResults)280    assert sorted(diff.removed) == removed281    assert sorted(diff.added) == added282    assert sorted(diff.modified) == modified283    # remove files we changed/removed from files list for comparing identical284    for f in removed + modified:285        files.remove(f)286    assert sorted(diff.identical) == sorted(files)287def test_snapshot_filter_function(tmp_path: pathlib.Path):288    """Test snapshot with filter function"""289    d1 = tmp_path / "dir1"290    d1.mkdir()291    files = populate_dir(d1)292    # snapshot 1293    snapshot_file1 = tmp_path / "1.snapshot"294    create_snapshot(295        str(d1), str(snapshot_file1), filter_function=lambda p: p.name != "file_1"296    )297    added, removed, modified = modify_files(d1)298    # snapshot 2299    snapshot_file2 = tmp_path / "2.snapshot"300    create_snapshot(301        str(d1), str(snapshot_file2), filter_function=lambda p: p.name != "file_1"302    )303    # remove file_1 as the filter function will ignore it304    modified.remove(str(d1 / "file_1"))305    files.remove(str(d1 / "file_1"))306    dirdiff = DirDiff(307        str(snapshot_file1),308        str(snapshot_file2),309    )310    diff = dirdiff.diff()311    assert isinstance(diff, DirDiffResults)312    assert sorted(diff.removed) == removed313    assert sorted(diff.added) == added314    assert sorted(diff.modified) == modified315    # remove files we changed/removed from files list for comparing identical316    for f in removed + modified:317        files.remove(f)318    assert sorted(diff.identical) == sorted(files)319def test_snapshot_no_walk(tmp_path: pathlib.Path):320    """Test snapshot with walk=False"""321    d1 = tmp_path / "dir1"322    d1.mkdir()323    files = populate_dir(d1)324    # snapshot 1325    snapshot_file1 = tmp_path / "1.snapshot"326    create_snapshot(str(d1), str(snapshot_file1), walk=False)327    added, removed, modified = modify_files(d1)328    # filter out files in subdirs since walk=False will filter these out329    added = [330        f for f in added if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]331    ]332    removed = [333        f for f in removed if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]334    ]335    modified = [336        f337        for f in modified338        if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]339    ]340    # snapshot 2341    snapshot_file2 = tmp_path / "2.snapshot"342    create_snapshot(str(d1), str(snapshot_file2), walk=False)343    dirdiff = DirDiff(344        str(snapshot_file1),345        str(snapshot_file2),346    )347    diff = dirdiff.diff()348    assert isinstance(diff, DirDiffResults)349    assert sorted(diff.removed) == removed350    assert sorted(diff.added) == added351    assert sorted(diff.modified) == modified352    # remove files we changed/removed from files list for comparing identical353    for f in removed + modified:354        files.remove(f)355    files = [356        f for f in files if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]357    ]358    assert sorted(diff.identical) == sorted(files)359def test_dirdiff_compare_function(tmp_path: pathlib.Path, capsys):360    """Test DirDiff with custom compare function"""361    d1 = tmp_path / "dir1"362    d1.mkdir()363    files = populate_dir(d1)364    # snapshot 1365    snapshot_file1 = tmp_path / "1.snapshot"366    snapshot_1 = create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")367    assert snapshot_1.description == "snapshot-1"368    added, removed, _ = modify_files(d1)369    # snapshot 2370    snapshot_file2 = tmp_path / "2.snapshot"371    snapshot_2 = create_snapshot(str(d1), str(snapshot_file2), description="snapshot-2")372    assert snapshot_2.description == "snapshot-2"373    def compare_function(record_1: SnapshotRecord, record_2: SnapshotRecord):374        # files are equal if size is the same375        return record_1.size == record_2.size376    dirdiff = DirDiff(snapshot_1, snapshot_2)377    diff = dirdiff.diff(compare_function=compare_function)378    assert isinstance(diff, DirDiffResults)379    assert sorted(diff.removed) == removed380    assert sorted(diff.added) == added381    # only list files that are not equal size382    modified = sorted(383        [384            str(f)385            for f in [386                d1 / "file_1",387                d1 / "dir_0" / "file_0_0",388                d1 / "dir_0",389                d1 / "dir_1",390            ]391        ]392    )393    assert sorted(diff.modified) == modified394    # remove files we changed/removed from files list for comparing identical395    for f in removed + modified:396        files.remove(f)397    assert sorted(diff.identical) == sorted(files)398    # test json399    json_dict = json.loads(dirdiff.diff(compare_function=compare_function).json())400    assert sorted(json_dict["added"]) == sorted(diff.added)401    assert sorted(json_dict["removed"]) == sorted(diff.removed)402    assert sorted(json_dict["modified"]) == sorted(diff.modified)403    assert sorted(json_dict["identical"]) == sorted(diff.identical)404def test_dirdiff_valuerror(tmp_path: pathlib.Path):405    """Test DirDiff with invalid arguments"""406    d1 = tmp_path / "dir1"407    d1.mkdir()408    files = populate_dir(d1)409    # snapshot 1410    snapshot_file1 = tmp_path / "1.snapshot"411    create_snapshot(str(d1), str(snapshot_file1), walk=False)412    # snapshot 2413    snapshot_file2 = tmp_path / "2.snapshot"414    create_snapshot(str(d1), str(snapshot_file2), walk=False)415    with pytest.raises(ValueError):416        dirdiff = DirDiff(417            str(d1),418            str(snapshot_file2),419        )420    with pytest.raises(ValueError):421        dirdiff = DirDiff(str(snapshot_file1), str(d1 / "file_0"))422def test_snapshot_error(tmp_path: pathlib.Path):423    """Test snapshot with with existing file"""424    d1 = tmp_path / "dir1"425    d1.mkdir()426    files = populate_dir(d1)427    # snapshot 1428    snapshot_file1 = tmp_path / "1.snapshot"429    create_snapshot(str(d1), str(snapshot_file1), walk=False)430    # snapshot 2431    with pytest.raises(ValueError):432        create_snapshot(str(d1), str(snapshot_file1), walk=False)433def test_dirsnapshot(tmp_path: pathlib.Path):434    """Test DirSnapshot methods"""435    d1 = tmp_path / "dir1"436    d1.mkdir()437    files = populate_dir(d1)438    snapshot_db = tmp_path / "snapshot.db"439    snapshot = create_snapshot(440        str(d1), snapshot_db, walk=True, description="Test Snapshot"441    )442    assert is_snapshot_file(snapshot_db)443    assert snapshot.description == "Test Snapshot"444    assert snapshot.directory == str(d1)445    assert sorted(list(snapshot.files())) == sorted(files)446    assert type(snapshot.datetime) == datetime.datetime447    info = snapshot.info448    assert type(info) == SnapshotInfo449    assert info.description == "Test Snapshot"450    assert info.directory == str(d1)451    record = snapshot.record(str(d1 / "file_0"))452    assert type(record) == SnapshotRecord453    assert record.path == str(d1 / "file_0")454    assert record.is_file455    assert not record.is_dir456    record_dict = record.asdict()457    assert record_dict["path"] == str(d1 / "file_0")458    records = list(snapshot.records())459    assert len(records) == len(files)460def test_dirsnapshot_load(tmp_path: pathlib.Path):461    """Test DirSnapshot methods when loaded from disk"""462    d1 = tmp_path / "dir1"463    d1.mkdir()464    files = populate_dir(d1)465    snapshot_db = tmp_path / "snapshot.db"466    snapshot_ = create_snapshot(467        str(d1), snapshot_db, walk=True, description="Test Snapshot"468    )469    assert is_snapshot_file(snapshot_db)470    snapshot = load_snapshot(snapshot_db)471    assert snapshot.description == "Test Snapshot"472    assert snapshot.directory == str(d1)473    assert sorted(list(snapshot.files())) == sorted(files)474    assert type(snapshot.datetime) == datetime.datetime475    info = snapshot.info476    assert type(info) == SnapshotInfo477    assert info.description == "Test Snapshot"478    assert info.directory == str(d1)479    record = snapshot.record(str(d1 / "file_0"))480    assert type(record) == SnapshotRecord481    assert record.path == str(d1 / "file_0")482    assert record.is_file483    assert not record.is_dir484    record_dict = record.asdict()485    assert record_dict["path"] == str(d1 / "file_0")486    records = list(snapshot.records())487    assert len(records) == len(files)488def test_dirsnapshot_len(tmp_path: pathlib.Path):489    """Test DirSnapshot length"""490    d1 = tmp_path / "dir1"491    d1.mkdir()492    files = populate_dir(d1)493    snapshot_db = tmp_path / "snapshot.db"494    snapshot = create_snapshot(495        str(d1), snapshot_db, walk=True, description="Test Snapshot"496    )...test_share_snapshots.py
Source:test_share_snapshots.py  
...14class ShareSnapshotCLITest(base.OSCClientTestBase):15    """Functional tests for share snpshot."""16    def test_share_snapshot_create(self):17        share = self.create_share()18        snapshot = self.create_snapshot(19            share=share['id'],20            name='Snap',21            description='Description')22        self.assertEqual(share["id"], snapshot["share_id"])23        self.assertEqual('Snap', snapshot["name"])24        self.assertEqual('Description', snapshot["description"])25        self.assertEqual('available', snapshot["status"])26        snapshots_list = self.listing_result('share snapshot', 'list')27        self.assertIn(snapshot['id'],28                      [item['ID'] for item in snapshots_list])29    def test_share_snapshot_delete(self):30        share = self.create_share()31        snapshot_1 = self.create_snapshot(32            share=share['id'], add_cleanup=False)33        snapshot_2 = self.create_snapshot(34            share=share['id'], add_cleanup=False)35        self.openstack(36            f'share snapshot delete {snapshot_1["id"]} {snapshot_2["id"]} '37            '--wait')38        self.check_object_deleted('share snapshot', snapshot_1["id"])39        self.check_object_deleted('share snapshot', snapshot_2["id"])40        snapshot_3 = self.create_snapshot(41            share=share['id'], add_cleanup=False)42        self.openstack(43            f'share snapshot set {snapshot_3["id"]} '44            '--status creating')45        self.openstack(46            f'share snapshot delete {snapshot_3["id"]} --wait --force')47        self.check_object_deleted('share snapshot', snapshot_3["id"])48    def test_share_snapshot_show(self):49        share = self.create_share()50        snapshot = self.create_snapshot(51            share=share['id'],52            name='Snap',53            description='Description')54        show_result = self.dict_result(55            'share snapshot', f'show {snapshot["id"]}')56        self.assertEqual(snapshot["id"], show_result["id"])57        self.assertEqual('Snap', show_result["name"])58        self.assertEqual('Description', show_result["description"])59    def test_share_snapshot_set(self):60        share = self.create_share()61        snapshot = self.create_snapshot(share=share['id'])62        self.openstack(63            f'share snapshot set {snapshot["id"]} '64            f'--name Snap --description Description')65        show_result = self.dict_result(66            'share snapshot ', f'show {snapshot["id"]}')67        self.assertEqual(snapshot['id'], show_result["id"])68        self.assertEqual('Snap', show_result["name"])69        self.assertEqual('Description', show_result["description"])70    def test_share_snapshot_unset(self):71        share = self.create_share()72        snapshot = self.create_snapshot(73            share=share['id'],74            name='Snap',75            description='Description')76        self.openstack(77            f'share snapshot unset {snapshot["id"]} --name --description')78        show_result = json.loads(self.openstack(79            f'share snapshot show -f json {snapshot["id"]}'))80        self.assertEqual(snapshot['id'], show_result["id"])81        self.assertIsNone(show_result["name"])82        self.assertIsNone(show_result["description"])83    def test_share_snapshot_list(self):84        share = self.create_share()85        snapshot_1 = self.create_snapshot(share=share['id'])86        snapshot_2 = self.create_snapshot(87            share=share['id'],88            description='Description')89        snapshots_list = self.listing_result(90            'share snapshot', f'list --name {snapshot_2["name"]} '91            f'--description {snapshot_2["description"]} --all-projects'92        )93        self.assertTableStruct(snapshots_list, [94            'ID',95            'Name',96            'Project ID'97        ])98        self.assertIn(snapshot_2["name"],99                      [snap["Name"] for snap in snapshots_list])100        self.assertEqual(1, len(snapshots_list))101        snapshots_list = self.listing_result(102            'share snapshot', f'list --share {share["name"]}')103        self.assertTableStruct(snapshots_list, [104            'ID',105            'Name'106        ])107        id_list = [snap["ID"] for snap in snapshots_list]108        self.assertIn(snapshot_1["id"], id_list)109        self.assertIn(snapshot_2["id"], id_list)110        snapshots_list = self.listing_result(111            'share snapshot',112            f'list --name~ {snapshot_2["name"][-3:]} '113            '--description~ Des --detail')114        self.assertTableStruct(snapshots_list, [115            'ID',116            'Name',117            'Status',118            'Description',119            'Created At',120            'Size',121            'Share ID',122            'Share Proto',123            'Share Size',124            'User ID'125        ])126        self.assertIn(snapshot_2["name"],127                      [snap["Name"] for snap in snapshots_list])128        self.assertEqual(129            snapshot_2["description"], snapshots_list[0]['Description'])130        self.assertEqual(1, len(snapshots_list))131    def test_share_snapshot_export_location_list(self):132        share = self.create_share()133        snapshot = self.create_snapshot(share=share['id'])134        export_location_list = self.listing_result(135            'share snapshot export location', f' list {snapshot["id"]}')136        self.assertTableStruct(export_location_list, [137            'ID',138            'Path'139        ])140    def test_share_snapshot_export_location_show(self):141        share = self.create_share()142        snapshot = self.create_snapshot(share=share['id'])143        export_location_list = self.listing_result(144            'share snapshot export location', f'list {snapshot["id"]}')145        export_location = self.dict_result(146            'share snapshot export location',147            f'show {snapshot["id"]} {export_location_list[0]["ID"]}')148        self.assertIn('id', export_location)149        self.assertIn('created_at', export_location)150        self.assertIn('is_admin_only', export_location)151        self.assertIn('path', export_location)152        self.assertIn('share_snapshot_instance_id', export_location)153        self.assertIn('updated_at', export_location)154    def test_share_snapshot_abandon_adopt(self):155        share = self.create_share()156        snapshot = self.create_snapshot(157            share=share['id'], add_cleanup=False)158        self.openstack(159            f'share snapshot abandon {snapshot["id"]} --wait')160        snapshots_list = self.listing_result('share snapshot', 'list')161        self.assertNotIn(snapshot['id'],162                         [item['ID'] for item in snapshots_list])163        snapshot = self.dict_result(164            'share snapshot',165            f'adopt {share["id"]} 10.0.0.1:/foo/path '166            f'--name Snap --description Zorilla --wait')167        snapshots_list = self.listing_result('share snapshot', 'list')168        self.assertIn(snapshot['id'],169                      [item['ID'] for item in snapshots_list])170        self.addCleanup(...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
