Best Python code snippet using fMBT_python
test_to_csv.py
Source:test_to_csv.py  
...20MIXED_FLOAT_DTYPES = ['float16', 'float32', 'float64']21MIXED_INT_DTYPES = ['uint8', 'uint16', 'uint32', 'uint64', 'int8', 'int16',22                    'int32', 'int64']23class TestDataFrameToCSV(TestData):24    def read_csv(self, path, **kwargs):25        params = dict(index_col=0, parse_dates=True)26        params.update(**kwargs)27        return pd.read_csv(path, **params)28    def test_from_csv_deprecation(self):29        # see gh-1781230        with ensure_clean('__tmp_from_csv_deprecation__') as path:31            self.tsframe.to_csv(path)32            with tm.assert_produces_warning(FutureWarning):33                depr_recons = DataFrame.from_csv(path)34                assert_frame_equal(self.tsframe, depr_recons)35    def test_to_csv_from_csv1(self):36        with ensure_clean('__tmp_to_csv_from_csv1__') as path:37            self.frame['A'][:5] = np.nan38            self.frame.to_csv(path)39            self.frame.to_csv(path, columns=['A', 'B'])40            self.frame.to_csv(path, header=False)41            self.frame.to_csv(path, index=False)42            # test roundtrip43            self.tsframe.to_csv(path)44            recons = self.read_csv(path)45            assert_frame_equal(self.tsframe, recons)46            self.tsframe.to_csv(path, index_label='index')47            recons = self.read_csv(path, index_col=None)48            assert(len(recons.columns) == len(self.tsframe.columns) + 1)49            # no index50            self.tsframe.to_csv(path, index=False)51            recons = self.read_csv(path, index_col=None)52            assert_almost_equal(self.tsframe.values, recons.values)53            # corner case54            dm = DataFrame({'s1': Series(lrange(3), lrange(3)),55                            's2': Series(lrange(2), lrange(2))})56            dm.to_csv(path)57            recons = self.read_csv(path)58            assert_frame_equal(dm, recons)59    def test_to_csv_from_csv2(self):60        with ensure_clean('__tmp_to_csv_from_csv2__') as path:61            # duplicate index62            df = DataFrame(np.random.randn(3, 3), index=['a', 'a', 'b'],63                           columns=['x', 'y', 'z'])64            df.to_csv(path)65            result = self.read_csv(path)66            assert_frame_equal(result, df)67            midx = MultiIndex.from_tuples(68                [('A', 1, 2), ('A', 1, 2), ('B', 1, 2)])69            df = DataFrame(np.random.randn(3, 3), index=midx,70                           columns=['x', 'y', 'z'])71            df.to_csv(path)72            result = self.read_csv(path, index_col=[0, 1, 2],73                                   parse_dates=False)74            assert_frame_equal(result, df, check_names=False)75            # column aliases76            col_aliases = Index(['AA', 'X', 'Y', 'Z'])77            self.frame2.to_csv(path, header=col_aliases)78            rs = self.read_csv(path)79            xp = self.frame2.copy()80            xp.columns = col_aliases81            assert_frame_equal(xp, rs)82            pytest.raises(ValueError, self.frame2.to_csv, path,83                          header=['AA', 'X'])84    def test_to_csv_from_csv3(self):85        with ensure_clean('__tmp_to_csv_from_csv3__') as path:86            df1 = DataFrame(np.random.randn(3, 1))87            df2 = DataFrame(np.random.randn(3, 1))88            df1.to_csv(path)89            df2.to_csv(path, mode='a', header=False)90            xp = pd.concat([df1, df2])91            rs = pd.read_csv(path, index_col=0)92            rs.columns = lmap(int, rs.columns)93            xp.columns = lmap(int, xp.columns)94            assert_frame_equal(xp, rs)95    def test_to_csv_from_csv4(self):96        with ensure_clean('__tmp_to_csv_from_csv4__') as path:97            # GH 10833 (TimedeltaIndex formatting)98            dt = pd.Timedelta(seconds=1)99            df = pd.DataFrame({'dt_data': [i * dt for i in range(3)]},100                              index=pd.Index([i * dt for i in range(3)],101                                             name='dt_index'))102            df.to_csv(path)103            result = pd.read_csv(path, index_col='dt_index')104            result.index = pd.to_timedelta(result.index)105            # TODO: remove renaming when GH 10875 is solved106            result.index = result.index.rename('dt_index')107            result['dt_data'] = pd.to_timedelta(result['dt_data'])108            assert_frame_equal(df, result, check_index_type=True)109    def test_to_csv_from_csv5(self):110        # tz, 8260111        with ensure_clean('__tmp_to_csv_from_csv5__') as path:112            self.tzframe.to_csv(path)113            result = pd.read_csv(path, index_col=0, parse_dates=['A'])114            converter = lambda c: to_datetime(result[c]).dt.tz_convert(115                'UTC').dt.tz_convert(self.tzframe[c].dt.tz)116            result['B'] = converter('B')117            result['C'] = converter('C')118            assert_frame_equal(result, self.tzframe)119    def test_to_csv_cols_reordering(self):120        # GH3454121        import pandas as pd122        chunksize = 5123        N = int(chunksize * 2.5)124        df = mkdf(N, 3)125        cs = df.columns126        cols = [cs[2], cs[0]]127        with ensure_clean() as path:128            df.to_csv(path, columns=cols, chunksize=chunksize)129            rs_c = pd.read_csv(path, index_col=0)130        assert_frame_equal(df[cols], rs_c, check_names=False)131    def test_to_csv_new_dupe_cols(self):132        import pandas as pd133        def _check_df(df, cols=None):134            with ensure_clean() as path:135                df.to_csv(path, columns=cols, chunksize=chunksize)136                rs_c = pd.read_csv(path, index_col=0)137                # we wrote them in a different order138                # so compare them in that order139                if cols is not None:140                    if df.columns.is_unique:141                        rs_c.columns = cols142                    else:143                        indexer, missing = df.columns.get_indexer_non_unique(144                            cols)145                        rs_c.columns = df.columns.take(indexer)146                    for c in cols:147                        obj_df = df[c]148                        obj_rs = rs_c[c]149                        if isinstance(obj_df, Series):150                            assert_series_equal(obj_df, obj_rs)151                        else:152                            assert_frame_equal(153                                obj_df, obj_rs, check_names=False)154                # wrote in the same order155                else:156                    rs_c.columns = df.columns157                    assert_frame_equal(df, rs_c, check_names=False)158        chunksize = 5159        N = int(chunksize * 2.5)160        # dupe cols161        df = mkdf(N, 3)162        df.columns = ['a', 'a', 'b']163        _check_df(df, None)164        # dupe cols with selection165        cols = ['b', 'a']166        _check_df(df, cols)167    @pytest.mark.slow168    def test_to_csv_dtnat(self):169        # GH3437170        from pandas import NaT171        def make_dtnat_arr(n, nnat=None):172            if nnat is None:173                nnat = int(n * 0.1)  # 10%174            s = list(date_range('2000', freq='5min', periods=n))175            if nnat:176                for i in np.random.randint(0, len(s), nnat):177                    s[i] = NaT178                i = np.random.randint(100)179                s[-i] = NaT180                s[i] = NaT181            return s182        chunksize = 1000183        # N=35000184        s1 = make_dtnat_arr(chunksize + 5)185        s2 = make_dtnat_arr(chunksize + 5, 0)186        # s3=make_dtnjat_arr(chunksize+5,0)187        with ensure_clean('1.csv') as pth:188            df = DataFrame(dict(a=s1, b=s2))189            df.to_csv(pth, chunksize=chunksize)190            recons = self.read_csv(pth)._convert(datetime=True,191                                                 coerce=True)192            assert_frame_equal(df, recons, check_names=False,193                               check_less_precise=True)194    @pytest.mark.slow195    def test_to_csv_moar(self):196        def _do_test(df, r_dtype=None, c_dtype=None,197                     rnlvl=None, cnlvl=None, dupe_col=False):198            kwargs = dict(parse_dates=False)199            if cnlvl:200                if rnlvl is not None:201                    kwargs['index_col'] = lrange(rnlvl)202                kwargs['header'] = lrange(cnlvl)203                with ensure_clean('__tmp_to_csv_moar__') as path:204                    df.to_csv(path, encoding='utf8',205                              chunksize=chunksize)206                    recons = self.read_csv(path, **kwargs)207            else:208                kwargs['header'] = 0209                with ensure_clean('__tmp_to_csv_moar__') as path:210                    df.to_csv(path, encoding='utf8', chunksize=chunksize)211                    recons = self.read_csv(path, **kwargs)212            def _to_uni(x):213                if not isinstance(x, compat.text_type):214                    return x.decode('utf8')215                return x216            if dupe_col:217                # read_Csv disambiguates the columns by218                # labeling them dupe.1,dupe.2, etc'. monkey patch columns219                recons.columns = df.columns220            if rnlvl and not cnlvl:221                delta_lvl = [recons.iloc[222                    :, i].values for i in range(rnlvl - 1)]223                ix = MultiIndex.from_arrays([list(recons.index)] + delta_lvl)224                recons.index = ix225                recons = recons.iloc[:, rnlvl - 1:]226            type_map = dict(i='i', f='f', s='O', u='O', dt='O', p='O')227            if r_dtype:228                if r_dtype == 'u':  # unicode229                    r_dtype = 'O'230                    recons.index = np.array(lmap(_to_uni, recons.index),231                                            dtype=r_dtype)232                    df.index = np.array(lmap(_to_uni, df.index), dtype=r_dtype)233                elif r_dtype == 'dt':  # unicode234                    r_dtype = 'O'235                    recons.index = np.array(lmap(Timestamp, recons.index),236                                            dtype=r_dtype)237                    df.index = np.array(238                        lmap(Timestamp, df.index), dtype=r_dtype)239                elif r_dtype == 'p':240                    r_dtype = 'O'241                    recons.index = np.array(242                        list(map(Timestamp, to_datetime(recons.index))),243                        dtype=r_dtype)244                    df.index = np.array(245                        list(map(Timestamp, df.index.to_timestamp())),246                        dtype=r_dtype)247                else:248                    r_dtype = type_map.get(r_dtype)249                    recons.index = np.array(recons.index, dtype=r_dtype)250                    df.index = np.array(df.index, dtype=r_dtype)251            if c_dtype:252                if c_dtype == 'u':253                    c_dtype = 'O'254                    recons.columns = np.array(lmap(_to_uni, recons.columns),255                                              dtype=c_dtype)256                    df.columns = np.array(257                        lmap(_to_uni, df.columns), dtype=c_dtype)258                elif c_dtype == 'dt':259                    c_dtype = 'O'260                    recons.columns = np.array(lmap(Timestamp, recons.columns),261                                              dtype=c_dtype)262                    df.columns = np.array(263                        lmap(Timestamp, df.columns), dtype=c_dtype)264                elif c_dtype == 'p':265                    c_dtype = 'O'266                    recons.columns = np.array(267                        lmap(Timestamp, to_datetime(recons.columns)),268                        dtype=c_dtype)269                    df.columns = np.array(270                        lmap(Timestamp, df.columns.to_timestamp()),271                        dtype=c_dtype)272                else:273                    c_dtype = type_map.get(c_dtype)274                    recons.columns = np.array(recons.columns, dtype=c_dtype)275                    df.columns = np.array(df.columns, dtype=c_dtype)276            assert_frame_equal(df, recons, check_names=False,277                               check_less_precise=True)278        N = 100279        chunksize = 1000280        for ncols in [4]:281            base = int((chunksize // ncols or 1) or 1)282            for nrows in [2, 10, N - 1, N, N + 1, N + 2, 2 * N - 2,283                          2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,284                          base - 1, base, base + 1]:285                _do_test(mkdf(nrows, ncols, r_idx_type='dt',286                              c_idx_type='s'), 'dt', 's')287        for ncols in [4]:288            base = int((chunksize // ncols or 1) or 1)289            for nrows in [2, 10, N - 1, N, N + 1, N + 2, 2 * N - 2,290                          2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,291                          base - 1, base, base + 1]:292                _do_test(mkdf(nrows, ncols, r_idx_type='dt',293                              c_idx_type='s'), 'dt', 's')294                pass295        for r_idx_type, c_idx_type in [('i', 'i'), ('s', 's'), ('u', 'dt'),296                                       ('p', 'p')]:297            for ncols in [1, 2, 3, 4]:298                base = int((chunksize // ncols or 1) or 1)299                for nrows in [2, 10, N - 1, N, N + 1, N + 2, 2 * N - 2,300                              2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,301                              base - 1, base, base + 1]:302                    _do_test(mkdf(nrows, ncols, r_idx_type=r_idx_type,303                                  c_idx_type=c_idx_type),304                             r_idx_type, c_idx_type)305        for ncols in [1, 2, 3, 4]:306            base = int((chunksize // ncols or 1) or 1)307            for nrows in [10, N - 2, N - 1, N, N + 1, N + 2, 2 * N - 2,308                          2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,309                          base - 1, base, base + 1]:310                _do_test(mkdf(nrows, ncols))311        for nrows in [10, N - 2, N - 1, N, N + 1, N + 2]:312            df = mkdf(nrows, 3)313            cols = list(df.columns)314            cols[:2] = ["dupe", "dupe"]315            cols[-2:] = ["dupe", "dupe"]316            ix = list(df.index)317            ix[:2] = ["rdupe", "rdupe"]318            ix[-2:] = ["rdupe", "rdupe"]319            df.index = ix320            df.columns = cols321            _do_test(df, dupe_col=True)322        _do_test(DataFrame(index=lrange(10)))323        _do_test(mkdf(chunksize // 2 + 1, 2, r_idx_nlevels=2), rnlvl=2)324        for ncols in [2, 3, 4]:325            base = int(chunksize // ncols)326            for nrows in [10, N - 2, N - 1, N, N + 1, N + 2, 2 * N - 2,327                          2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,328                          base - 1, base, base + 1]:329                _do_test(mkdf(nrows, ncols, r_idx_nlevels=2), rnlvl=2)330                _do_test(mkdf(nrows, ncols, c_idx_nlevels=2), cnlvl=2)331                _do_test(mkdf(nrows, ncols, r_idx_nlevels=2, c_idx_nlevels=2),332                         rnlvl=2, cnlvl=2)333    def test_to_csv_from_csv_w_some_infs(self):334        # test roundtrip with inf, -inf, nan, as full columns and mix335        self.frame['G'] = np.nan336        f = lambda x: [np.inf, np.nan][np.random.rand() < .5]337        self.frame['H'] = self.frame.index.map(f)338        with ensure_clean() as path:339            self.frame.to_csv(path)340            recons = self.read_csv(path)341            # TODO to_csv drops column name342            assert_frame_equal(self.frame, recons, check_names=False)343            assert_frame_equal(np.isinf(self.frame),344                               np.isinf(recons), check_names=False)345    def test_to_csv_from_csv_w_all_infs(self):346        # test roundtrip with inf, -inf, nan, as full columns and mix347        self.frame['E'] = np.inf348        self.frame['F'] = -np.inf349        with ensure_clean() as path:350            self.frame.to_csv(path)351            recons = self.read_csv(path)352            # TODO to_csv drops column name353            assert_frame_equal(self.frame, recons, check_names=False)354            assert_frame_equal(np.isinf(self.frame),355                               np.isinf(recons), check_names=False)356    def test_to_csv_no_index(self):357        # GH 3624, after appending columns, to_csv fails358        with ensure_clean('__tmp_to_csv_no_index__') as path:359            df = DataFrame({'c1': [1, 2, 3], 'c2': [4, 5, 6]})360            df.to_csv(path, index=False)361            result = read_csv(path)362            assert_frame_equal(df, result)363            df['c3'] = Series([7, 8, 9], dtype='int64')364            df.to_csv(path, index=False)365            result = read_csv(path)366            assert_frame_equal(df, result)367    def test_to_csv_with_mix_columns(self):368        # gh-11637: incorrect output when a mix of integer and string column369        # names passed as columns parameter in to_csv370        df = DataFrame({0: ['a', 'b', 'c'],371                        1: ['aa', 'bb', 'cc']})372        df['test'] = 'txt'373        assert df.to_csv() == df.to_csv(columns=[0, 1, 'test'])374    def test_to_csv_headers(self):375        # GH6186, the presence or absence of `index` incorrectly376        # causes to_csv to have different header semantics.377        from_df = DataFrame([[1, 2], [3, 4]], columns=['A', 'B'])378        to_df = DataFrame([[1, 2], [3, 4]], columns=['X', 'Y'])379        with ensure_clean('__tmp_to_csv_headers__') as path:380            from_df.to_csv(path, header=['X', 'Y'])381            recons = self.read_csv(path)382            assert_frame_equal(to_df, recons)383            from_df.to_csv(path, index=False, header=['X', 'Y'])384            recons = self.read_csv(path)385            recons.reset_index(inplace=True)386            assert_frame_equal(to_df, recons)387    def test_to_csv_multiindex(self):388        frame = self.frame389        old_index = frame.index390        arrays = np.arange(len(old_index) * 2).reshape(2, -1)391        new_index = MultiIndex.from_arrays(arrays, names=['first', 'second'])392        frame.index = new_index393        with ensure_clean('__tmp_to_csv_multiindex__') as path:394            frame.to_csv(path, header=False)395            frame.to_csv(path, columns=['A', 'B'])396            # round trip397            frame.to_csv(path)398            df = self.read_csv(path, index_col=[0, 1],399                               parse_dates=False)400            # TODO to_csv drops column name401            assert_frame_equal(frame, df, check_names=False)402            assert frame.index.names == df.index.names403            # needed if setUp becomes a class method404            self.frame.index = old_index405            # try multiindex with dates406            tsframe = self.tsframe407            old_index = tsframe.index408            new_index = [old_index, np.arange(len(old_index))]409            tsframe.index = MultiIndex.from_arrays(new_index)410            tsframe.to_csv(path, index_label=['time', 'foo'])411            recons = self.read_csv(path, index_col=[0, 1])412            # TODO to_csv drops column name413            assert_frame_equal(tsframe, recons, check_names=False)414            # do not load index415            tsframe.to_csv(path)416            recons = self.read_csv(path, index_col=None)417            assert len(recons.columns) == len(tsframe.columns) + 2418            # no index419            tsframe.to_csv(path, index=False)420            recons = self.read_csv(path, index_col=None)421            assert_almost_equal(recons.values, self.tsframe.values)422            # needed if setUp becomes class method423            self.tsframe.index = old_index424        with ensure_clean('__tmp_to_csv_multiindex__') as path:425            # GH3571, GH1651, GH3141426            def _make_frame(names=None):427                if names is True:428                    names = ['first', 'second']429                return DataFrame(np.random.randint(0, 10, size=(3, 3)),430                                 columns=MultiIndex.from_tuples(431                                     [('bah', 'foo'),432                                      ('bah', 'bar'),433                                      ('ban', 'baz')], names=names),434                                 dtype='int64')435            # column & index are multi-index436            df = mkdf(5, 3, r_idx_nlevels=2, c_idx_nlevels=4)437            df.to_csv(path)438            result = read_csv(path, header=[0, 1, 2, 3],439                              index_col=[0, 1])440            assert_frame_equal(df, result)441            # column is mi442            df = mkdf(5, 3, r_idx_nlevels=1, c_idx_nlevels=4)443            df.to_csv(path)444            result = read_csv(445                path, header=[0, 1, 2, 3], index_col=0)446            assert_frame_equal(df, result)447            # dup column names?448            df = mkdf(5, 3, r_idx_nlevels=3, c_idx_nlevels=4)449            df.to_csv(path)450            result = read_csv(path, header=[0, 1, 2, 3],451                              index_col=[0, 1, 2])452            assert_frame_equal(df, result)453            # writing with no index454            df = _make_frame()455            df.to_csv(path, index=False)456            result = read_csv(path, header=[0, 1])457            assert_frame_equal(df, result)458            # we lose the names here459            df = _make_frame(True)460            df.to_csv(path, index=False)461            result = read_csv(path, header=[0, 1])462            assert com._all_none(*result.columns.names)463            result.columns.names = df.columns.names464            assert_frame_equal(df, result)465            # tupleize_cols=True and index=False466            df = _make_frame(True)467            with tm.assert_produces_warning(FutureWarning):468                df.to_csv(path, tupleize_cols=True, index=False)469            with tm.assert_produces_warning(FutureWarning,470                                            check_stacklevel=False):471                result = read_csv(path, header=0,472                                  tupleize_cols=True,473                                  index_col=None)474            result.columns = df.columns475            assert_frame_equal(df, result)476            # whatsnew example477            df = _make_frame()478            df.to_csv(path)479            result = read_csv(path, header=[0, 1],480                              index_col=[0])481            assert_frame_equal(df, result)482            df = _make_frame(True)483            df.to_csv(path)484            result = read_csv(path, header=[0, 1],485                              index_col=[0])486            assert_frame_equal(df, result)487            # column & index are multi-index (compatibility)488            df = mkdf(5, 3, r_idx_nlevels=2, c_idx_nlevels=4)489            with tm.assert_produces_warning(FutureWarning):490                df.to_csv(path, tupleize_cols=True)491            with tm.assert_produces_warning(FutureWarning,492                                            check_stacklevel=False):493                result = read_csv(path, header=0, index_col=[0, 1],494                                  tupleize_cols=True)495            result.columns = df.columns496            assert_frame_equal(df, result)497            # invalid options498            df = _make_frame(True)499            df.to_csv(path)500            for i in [6, 7]:501                msg = 'len of {i}, but only 5 lines in file'.format(i=i)502                with pytest.raises(ParserError, match=msg):503                    read_csv(path, header=lrange(i), index_col=0)504            # write with cols505            msg = 'cannot specify cols with a MultiIndex'506            with pytest.raises(TypeError, match=msg):507                df.to_csv(path, columns=['foo', 'bar'])508        with ensure_clean('__tmp_to_csv_multiindex__') as path:509            # empty510            tsframe[:0].to_csv(path)511            recons = self.read_csv(path)512            exp = tsframe[:0]513            exp.index = []514            tm.assert_index_equal(recons.columns, exp.columns)515            assert len(recons) == 0516    def test_to_csv_float32_nanrep(self):517        df = DataFrame(np.random.randn(1, 4).astype(np.float32))518        df[1] = np.nan519        with ensure_clean('__tmp_to_csv_float32_nanrep__.csv') as path:520            df.to_csv(path, na_rep=999)521            with open(path) as f:522                lines = f.readlines()523                assert lines[1].split(',')[2] == '999'524    def test_to_csv_withcommas(self):525        # Commas inside fields should be correctly escaped when saving as CSV.526        df = DataFrame({'A': [1, 2, 3], 'B': ['5,6', '7,8', '9,0']})527        with ensure_clean('__tmp_to_csv_withcommas__.csv') as path:528            df.to_csv(path)529            df2 = self.read_csv(path)530            assert_frame_equal(df2, df)531    def test_to_csv_mixed(self):532        def create_cols(name):533            return ["%s%03d" % (name, i) for i in range(5)]534        df_float = DataFrame(np.random.randn(535            100, 5), dtype='float64', columns=create_cols('float'))536        df_int = DataFrame(np.random.randn(100, 5),537                           dtype='int64', columns=create_cols('int'))538        df_bool = DataFrame(True, index=df_float.index,539                            columns=create_cols('bool'))540        df_object = DataFrame('foo', index=df_float.index,541                              columns=create_cols('object'))542        df_dt = DataFrame(Timestamp('20010101'),543                          index=df_float.index, columns=create_cols('date'))544        # add in some nans545        df_float.loc[30:50, 1:3] = np.nan546        # ## this is a bug in read_csv right now ####547        # df_dt.loc[30:50,1:3] = np.nan548        df = pd.concat([df_float, df_int, df_bool, df_object, df_dt], axis=1)549        # dtype550        dtypes = dict()551        for n, dtype in [('float', np.float64), ('int', np.int64),552                         ('bool', np.bool), ('object', np.object)]:553            for c in create_cols(n):554                dtypes[c] = dtype555        with ensure_clean() as filename:556            df.to_csv(filename)557            rs = read_csv(filename, index_col=0, dtype=dtypes,558                          parse_dates=create_cols('date'))559            assert_frame_equal(rs, df)560    def test_to_csv_dups_cols(self):561        df = DataFrame(np.random.randn(1000, 30), columns=lrange(562            15) + lrange(15), dtype='float64')563        with ensure_clean() as filename:564            df.to_csv(filename)  # single dtype, fine565            result = read_csv(filename, index_col=0)566            result.columns = df.columns567            assert_frame_equal(result, df)568        df_float = DataFrame(np.random.randn(1000, 3), dtype='float64')569        df_int = DataFrame(np.random.randn(1000, 3), dtype='int64')570        df_bool = DataFrame(True, index=df_float.index, columns=lrange(3))571        df_object = DataFrame('foo', index=df_float.index, columns=lrange(3))572        df_dt = DataFrame(Timestamp('20010101'),573                          index=df_float.index, columns=lrange(3))574        df = pd.concat([df_float, df_int, df_bool, df_object,575                        df_dt], axis=1, ignore_index=True)576        cols = []577        for i in range(5):578            cols.extend([0, 1, 2])579        df.columns = cols580        with ensure_clean() as filename:581            df.to_csv(filename)582            result = read_csv(filename, index_col=0)583            # date cols584            for i in ['0.4', '1.4', '2.4']:585                result[i] = to_datetime(result[i])586            result.columns = df.columns587            assert_frame_equal(result, df)588        # GH3457589        from pandas.util.testing import makeCustomDataframe as mkdf590        N = 10591        df = mkdf(N, 3)592        df.columns = ['a', 'a', 'b']593        with ensure_clean() as filename:594            df.to_csv(filename)595            # read_csv will rename the dups columns596            result = read_csv(filename, index_col=0)597            result = result.rename(columns={'a.1': 'a'})598            assert_frame_equal(result, df)599    def test_to_csv_chunking(self):600        aa = DataFrame({'A': lrange(100000)})601        aa['B'] = aa.A + 1.0602        aa['C'] = aa.A + 2.0603        aa['D'] = aa.A + 3.0604        for chunksize in [10000, 50000, 100000]:605            with ensure_clean() as filename:606                aa.to_csv(filename, chunksize=chunksize)607                rs = read_csv(filename, index_col=0)608                assert_frame_equal(rs, aa)609    @pytest.mark.slow610    def test_to_csv_wide_frame_formatting(self):611        # Issue #8621612        df = DataFrame(np.random.randn(1, 100010), columns=None, index=None)613        with ensure_clean() as filename:614            df.to_csv(filename, header=False, index=False)615            rs = read_csv(filename, header=None)616            assert_frame_equal(rs, df)617    def test_to_csv_bug(self):618        f1 = StringIO('a,1.0\nb,2.0')619        df = self.read_csv(f1, header=None)620        newdf = DataFrame({'t': df[df.columns[0]]})621        with ensure_clean() as path:622            newdf.to_csv(path)623            recons = read_csv(path, index_col=0)624            # don't check_names as t != 1625            assert_frame_equal(recons, newdf, check_names=False)626    def test_to_csv_unicode(self):627        df = DataFrame({u('c/\u03c3'): [1, 2, 3]})628        with ensure_clean() as path:629            df.to_csv(path, encoding='UTF-8')630            df2 = read_csv(path, index_col=0, encoding='UTF-8')631            assert_frame_equal(df, df2)632            df.to_csv(path, encoding='UTF-8', index=False)633            df2 = read_csv(path, index_col=None, encoding='UTF-8')634            assert_frame_equal(df, df2)635    def test_to_csv_unicode_index_col(self):636        buf = StringIO('')637        df = DataFrame(638            [[u("\u05d0"), "d2", "d3", "d4"], ["a1", "a2", "a3", "a4"]],639            columns=[u("\u05d0"),640                     u("\u05d1"), u("\u05d2"), u("\u05d3")],641            index=[u("\u05d0"), u("\u05d1")])642        df.to_csv(buf, encoding='UTF-8')643        buf.seek(0)644        df2 = read_csv(buf, index_col=0, encoding='UTF-8')645        assert_frame_equal(df, df2)646    def test_to_csv_stringio(self):647        buf = StringIO()648        self.frame.to_csv(buf)649        buf.seek(0)650        recons = read_csv(buf, index_col=0)651        # TODO to_csv drops column name652        assert_frame_equal(recons, self.frame, check_names=False)653    def test_to_csv_float_format(self):654        df = DataFrame([[0.123456, 0.234567, 0.567567],655                        [12.32112, 123123.2, 321321.2]],656                       index=['A', 'B'], columns=['X', 'Y', 'Z'])657        with ensure_clean() as filename:658            df.to_csv(filename, float_format='%.2f')659            rs = read_csv(filename, index_col=0)660            xp = DataFrame([[0.12, 0.23, 0.57],661                            [12.32, 123123.20, 321321.20]],662                           index=['A', 'B'], columns=['X', 'Y', 'Z'])663            assert_frame_equal(rs, xp)664    def test_to_csv_unicodewriter_quoting(self):665        df = DataFrame({'A': [1, 2, 3], 'B': ['foo', 'bar', 'baz']})666        buf = StringIO()667        df.to_csv(buf, index=False, quoting=csv.QUOTE_NONNUMERIC,668                  encoding='utf-8')669        result = buf.getvalue()670        expected_rows = ['"A","B"',671                         '1,"foo"',672                         '2,"bar"',673                         '3,"baz"']674        expected = tm.convert_rows_list_to_csv_str(expected_rows)675        assert result == expected676    def test_to_csv_quote_none(self):677        # GH4328678        df = DataFrame({'A': ['hello', '{"hello"}']})679        for encoding in (None, 'utf-8'):680            buf = StringIO()681            df.to_csv(buf, quoting=csv.QUOTE_NONE,682                      encoding=encoding, index=False)683            result = buf.getvalue()684            expected_rows = ['A',685                             'hello',686                             '{"hello"}']687            expected = tm.convert_rows_list_to_csv_str(expected_rows)688            assert result == expected689    def test_to_csv_index_no_leading_comma(self):690        df = DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},691                       index=['one', 'two', 'three'])692        buf = StringIO()693        df.to_csv(buf, index_label=False)694        expected_rows = ['A,B',695                         'one,1,4',696                         'two,2,5',697                         'three,3,6']698        expected = tm.convert_rows_list_to_csv_str(expected_rows)699        assert buf.getvalue() == expected700    def test_to_csv_line_terminators(self):701        # see gh-20353702        df = DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},703                       index=['one', 'two', 'three'])704        with ensure_clean() as path:705            # case 1: CRLF as line terminator706            df.to_csv(path, line_terminator='\r\n')707            expected = b',A,B\r\none,1,4\r\ntwo,2,5\r\nthree,3,6\r\n'708            with open(path, mode='rb') as f:709                assert f.read() == expected710        with ensure_clean() as path:711            # case 2: LF as line terminator712            df.to_csv(path, line_terminator='\n')713            expected = b',A,B\none,1,4\ntwo,2,5\nthree,3,6\n'714            with open(path, mode='rb') as f:715                assert f.read() == expected716        with ensure_clean() as path:717            # case 3: The default line terminator(=os.linesep)(gh-21406)718            df.to_csv(path)719            os_linesep = os.linesep.encode('utf-8')720            expected = (b',A,B' + os_linesep + b'one,1,4' + os_linesep +721                        b'two,2,5' + os_linesep + b'three,3,6' + os_linesep)722            with open(path, mode='rb') as f:723                assert f.read() == expected724    def test_to_csv_from_csv_categorical(self):725        # CSV with categoricals should result in the same output726        # as when one would add a "normal" Series/DataFrame.727        s = Series(pd.Categorical(["a", "b", "b", "a", "a", "c", "c", "c"]))728        s2 = Series(["a", "b", "b", "a", "a", "c", "c", "c"])729        res = StringIO()730        s.to_csv(res, header=False)731        exp = StringIO()732        s2.to_csv(exp, header=False)733        assert res.getvalue() == exp.getvalue()734        df = DataFrame({"s": s})735        df2 = DataFrame({"s": s2})736        res = StringIO()737        df.to_csv(res)738        exp = StringIO()739        df2.to_csv(exp)740        assert res.getvalue() == exp.getvalue()741    def test_to_csv_path_is_none(self):742        # GH 8215743        # Make sure we return string for consistency with744        # Series.to_csv()745        csv_str = self.frame.to_csv(path_or_buf=None)746        assert isinstance(csv_str, str)747        recons = pd.read_csv(StringIO(csv_str), index_col=0)748        assert_frame_equal(self.frame, recons)749    @pytest.mark.parametrize('df,encoding', [750        (DataFrame([[0.123456, 0.234567, 0.567567],751                    [12.32112, 123123.2, 321321.2]],752                   index=['A', 'B'], columns=['X', 'Y', 'Z']), None),753        # GH 21241, 21118754        (DataFrame([['abc', 'def', 'ghi']], columns=['X', 'Y', 'Z']), 'ascii'),755        (DataFrame(5 * [[123, u"你好", u"ä¸ç"]],756                   columns=['X', 'Y', 'Z']), 'gb2312'),757        (DataFrame(5 * [[123, u"Îειά ÏοÏ
", u"ÎÏÏμε"]],758                   columns=['X', 'Y', 'Z']), 'cp737')759    ])760    def test_to_csv_compression(self, df, encoding, compression):761        with ensure_clean() as filename:762            df.to_csv(filename, compression=compression, encoding=encoding)763            # test the round trip - to_csv -> read_csv764            result = read_csv(filename, compression=compression,765                              index_col=0, encoding=encoding)766            assert_frame_equal(df, result)767            # test the round trip using file handle - to_csv -> read_csv768            f, _handles = _get_handle(filename, 'w', compression=compression,769                                      encoding=encoding)770            with f:771                df.to_csv(f, encoding=encoding)772            result = pd.read_csv(filename, compression=compression,773                                 encoding=encoding, index_col=0, squeeze=True)774            assert_frame_equal(df, result)775            # explicitly make sure file is compressed776            with tm.decompress_file(filename, compression) as fh:777                text = fh.read().decode(encoding or 'utf8')778                for col in df.columns:779                    assert col in text780            with tm.decompress_file(filename, compression) as fh:781                assert_frame_equal(df, read_csv(fh,782                                                index_col=0,783                                                encoding=encoding))784    def test_to_csv_date_format(self):785        with ensure_clean('__tmp_to_csv_date_format__') as path:786            dt_index = self.tsframe.index787            datetime_frame = DataFrame(788                {'A': dt_index, 'B': dt_index.shift(1)}, index=dt_index)789            datetime_frame.to_csv(path, date_format='%Y%m%d')790            # Check that the data was put in the specified format791            test = read_csv(path, index_col=0)792            datetime_frame_int = datetime_frame.applymap(793                lambda x: int(x.strftime('%Y%m%d')))794            datetime_frame_int.index = datetime_frame_int.index.map(795                lambda x: int(x.strftime('%Y%m%d')))796            assert_frame_equal(test, datetime_frame_int)797            datetime_frame.to_csv(path, date_format='%Y-%m-%d')798            # Check that the data was put in the specified format799            test = read_csv(path, index_col=0)800            datetime_frame_str = datetime_frame.applymap(801                lambda x: x.strftime('%Y-%m-%d'))802            datetime_frame_str.index = datetime_frame_str.index.map(803                lambda x: x.strftime('%Y-%m-%d'))804            assert_frame_equal(test, datetime_frame_str)805            # Check that columns get converted806            datetime_frame_columns = datetime_frame.T807            datetime_frame_columns.to_csv(path, date_format='%Y%m%d')808            test = read_csv(path, index_col=0)809            datetime_frame_columns = datetime_frame_columns.applymap(810                lambda x: int(x.strftime('%Y%m%d')))811            # Columns don't get converted to ints by read_csv812            datetime_frame_columns.columns = (813                datetime_frame_columns.columns814                .map(lambda x: x.strftime('%Y%m%d')))815            assert_frame_equal(test, datetime_frame_columns)816            # test NaTs817            nat_index = to_datetime(818                ['NaT'] * 10 + ['2000-01-01', '1/1/2000', '1-1-2000'])819            nat_frame = DataFrame({'A': nat_index}, index=nat_index)820            nat_frame.to_csv(path, date_format='%Y-%m-%d')821            test = read_csv(path, parse_dates=[0, 1], index_col=0)822            assert_frame_equal(test, nat_frame)823    def test_to_csv_with_dst_transitions(self):824        with ensure_clean('csv_date_format_with_dst') as path:825            # make sure we are not failing on transitions826            times = pd.date_range("2013-10-26 23:00", "2013-10-27 01:00",827                                  tz="Europe/London",828                                  freq="H",829                                  ambiguous='infer')830            for i in [times, times + pd.Timedelta('10s')]:831                time_range = np.array(range(len(i)), dtype='int64')832                df = DataFrame({'A': time_range}, index=i)833                df.to_csv(path, index=True)834                # we have to reconvert the index as we835                # don't parse the tz's836                result = read_csv(path, index_col=0)837                result.index = to_datetime(result.index, utc=True).tz_convert(838                    'Europe/London')839                assert_frame_equal(result, df)840        # GH11619841        idx = pd.date_range('2015-01-01', '2015-12-31',842                            freq='H', tz='Europe/Paris')843        df = DataFrame({'values': 1, 'idx': idx},844                       index=idx)845        with ensure_clean('csv_date_format_with_dst') as path:846            df.to_csv(path, index=True)847            result = read_csv(path, index_col=0)848            result.index = to_datetime(result.index, utc=True).tz_convert(849                'Europe/Paris')850            result['idx'] = to_datetime(result['idx'], utc=True).astype(851                'datetime64[ns, Europe/Paris]')852            assert_frame_equal(result, df)853        # assert working854        df.astype(str)855        with ensure_clean('csv_date_format_with_dst') as path:856            df.to_pickle(path)857            result = pd.read_pickle(path)858            assert_frame_equal(result, df)859    def test_to_csv_quoting(self):860        df = DataFrame({861            'c_bool': [True, False],862            'c_float': [1.0, 3.2],863            'c_int': [42, np.nan],864            'c_string': ['a', 'b,c'],865        })866        expected_rows = [',c_bool,c_float,c_int,c_string',867                         '0,True,1.0,42.0,a',868                         '1,False,3.2,,"b,c"']869        expected = tm.convert_rows_list_to_csv_str(expected_rows)870        result = df.to_csv()871        assert result == expected872        result = df.to_csv(quoting=None)873        assert result == expected874        expected_rows = [',c_bool,c_float,c_int,c_string',875                         '0,True,1.0,42.0,a',876                         '1,False,3.2,,"b,c"']877        expected = tm.convert_rows_list_to_csv_str(expected_rows)878        result = df.to_csv(quoting=csv.QUOTE_MINIMAL)879        assert result == expected880        expected_rows = ['"","c_bool","c_float","c_int","c_string"',881                         '"0","True","1.0","42.0","a"',882                         '"1","False","3.2","","b,c"']883        expected = tm.convert_rows_list_to_csv_str(expected_rows)884        result = df.to_csv(quoting=csv.QUOTE_ALL)885        assert result == expected886        # see gh-12922, gh-13259: make sure changes to887        # the formatters do not break this behaviour888        expected_rows = ['"","c_bool","c_float","c_int","c_string"',889                         '0,True,1.0,42.0,"a"',890                         '1,False,3.2,"","b,c"']891        expected = tm.convert_rows_list_to_csv_str(expected_rows)892        result = df.to_csv(quoting=csv.QUOTE_NONNUMERIC)893        assert result == expected894        msg = "need to escape, but no escapechar set"895        with pytest.raises(csv.Error, match=msg):896            df.to_csv(quoting=csv.QUOTE_NONE)897        with pytest.raises(csv.Error, match=msg):898            df.to_csv(quoting=csv.QUOTE_NONE, escapechar=None)899        expected_rows = [',c_bool,c_float,c_int,c_string',900                         '0,True,1.0,42.0,a',901                         '1,False,3.2,,b!,c']902        expected = tm.convert_rows_list_to_csv_str(expected_rows)903        result = df.to_csv(quoting=csv.QUOTE_NONE,904                           escapechar='!')905        assert result == expected906        expected_rows = [',c_bool,c_ffloat,c_int,c_string',907                         '0,True,1.0,42.0,a',908                         '1,False,3.2,,bf,c']909        expected = tm.convert_rows_list_to_csv_str(expected_rows)910        result = df.to_csv(quoting=csv.QUOTE_NONE,911                           escapechar='f')912        assert result == expected913        # see gh-3503: quoting Windows line terminators914        # presents with encoding?915        text_rows = ['a,b,c',916                     '1,"test \r\n",3']917        text = tm.convert_rows_list_to_csv_str(text_rows)918        df = pd.read_csv(StringIO(text))919        buf = StringIO()920        df.to_csv(buf, encoding='utf-8', index=False)921        assert buf.getvalue() == text922        # xref gh-7791: make sure the quoting parameter is passed through923        # with multi-indexes924        df = pd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})925        df = df.set_index(['a', 'b'])926        expected_rows = ['"a","b","c"',927                         '"1","3","5"',928                         '"2","4","6"']929        expected = tm.convert_rows_list_to_csv_str(expected_rows)930        assert df.to_csv(quoting=csv.QUOTE_ALL) == expected931    def test_period_index_date_overflow(self):932        # see gh-15982933        dates = ["1990-01-01", "2000-01-01", "3005-01-01"]934        index = pd.PeriodIndex(dates, freq="D")935        df = pd.DataFrame([4, 5, 6], index=index)936        result = df.to_csv()937        expected_rows = [',0',938                         '1990-01-01,4',939                         '2000-01-01,5',940                         '3005-01-01,6']941        expected = tm.convert_rows_list_to_csv_str(expected_rows)942        assert result == expected943        date_format = "%m-%d-%Y"944        result = df.to_csv(date_format=date_format)945        expected_rows = [',0',946                         '01-01-1990,4',947                         '01-01-2000,5',948                         '01-01-3005,6']949        expected = tm.convert_rows_list_to_csv_str(expected_rows)950        assert result == expected951        # Overflow with pd.NaT952        dates = ["1990-01-01", pd.NaT, "3005-01-01"]953        index = pd.PeriodIndex(dates, freq="D")954        df = pd.DataFrame([4, 5, 6], index=index)955        result = df.to_csv()956        expected_rows = [',0',957                         '1990-01-01,4',958                         ',5',959                         '3005-01-01,6']960        expected = tm.convert_rows_list_to_csv_str(expected_rows)961        assert result == expected962    def test_multi_index_header(self):963        # see gh-5539964        columns = pd.MultiIndex.from_tuples([("a", 1), ("a", 2),965                                             ("b", 1), ("b", 2)])966        df = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]])967        df.columns = columns968        header = ["a", "b", "c", "d"]969        result = df.to_csv(header=header)970        expected_rows = [',a,b,c,d',971                         '0,1,2,3,4',972                         '1,5,6,7,8']973        expected = tm.convert_rows_list_to_csv_str(expected_rows)974        assert result == expected975    def test_gz_lineend(self):976        # GH 25311977        df = pd.DataFrame({'a': [1, 2]})978        expected_rows = ['a', '1', '2']979        expected = tm.convert_rows_list_to_csv_str(expected_rows)980        with ensure_clean('__test_gz_lineend.csv.gz') as path:981            df.to_csv(path, index=False)982            with tm.decompress_file(path, compression='gzip') as f:983                result = f.read().decode('utf-8')...read_csv_file.py
Source:read_csv_file.py  
1## Import libraries2import csv3import sys4## Read input csv file (returns a list of rows)5def read_csv_file(input_csv_file: str, output_format='list'):6    ## Open the CSV file7    with open(input_csv_file, 'r', encoding='UTF8') as input_file:8        # Prevent possible errors due to large columns (beyond 131072 characters)9        try:10            if output_format == 'list':11                input_csv_file_lines = list(csv.reader(input_file))12            elif output_format == 'dictionary':13                input_csv_file_dictionary = csv.DictReader(input_file)14                input_csv_file_lines = []15                for row in input_csv_file_dictionary:16                    input_csv_file_lines.append(dict(row))17            else:18                input_csv_file_lines = csv.reader(input_file)19        except:20            print("Presence of too large cells!!!")21            field_size_limit = sys.maxsize22            while True:23                try:24                    csv.field_size_limit(field_size_limit)25                    break26                except:27                    field_size_limit = int(field_size_limit / 10)28            if output_format == 'list':29                input_csv_file_lines = list(csv.reader(input_file))30            elif output_format == 'dictionary':31                input_csv_file_dictionary = csv.DictReader(input_file)32                input_csv_file_lines = []33                for row in input_csv_file_dictionary:34                    input_csv_file_lines.append(dict(row))35            else:36                input_csv_file_lines = csv.reader(input_file)37    ## Bring the row lengths on par38    if output_format == 'list':39        csv_column_header = input_csv_file_lines[0]40        csv_column_number = len(csv_column_header)41        for r in range(len(input_csv_file_lines)):42            if len(input_csv_file_lines[r]) < csv_column_number:43                for cdiff in range(csv_column_number - len(input_csv_file_lines[r])):44                    input_csv_file_lines[r].append(None)45    else:46        pass47    # return...csv_value_replacer.py
Source:csv_value_replacer.py  
1# Import libraries2from functions.libraries import *3from functions.create_replacing_map import *4from functions.replace_csv_values import *5from functions.read_csv_file import *6from functions.write_csv_file import *7import sys8# Input map CSV file (with replacing map --> "Old value" ; "New value")9Tk().withdraw()10messagebox.showinfo(title='Select map CSV file', message='Select the CSV file with the "old"-"new" map for value replacement')11Tk().withdraw()12input_csv_file_with_map = filedialog.askopenfilename(filetypes=[('CSV files', '.csv')])13Tk().withdraw()14messagebox.showinfo(title='CSV file selected', message="The CSV file selected is '%s'" % (input_csv_file_with_map))15# Try to read map only if a file is selected16if input_csv_file_with_map != "":17    # Open CSV with mapping18    input_csv_file_with_map_lines = read_csv_file(input_csv_file_with_map)19    # Create the map20    mapping_dictionary_array = create_replacing_map(input_csv_file_with_map_lines)21else :22    input_csv_file_with_map_lines = []23    mapping_dictionary_array = []24# Input CSV file (to be replaced)25Tk().withdraw()26messagebox.showinfo(title='Select CSV file', message='Select the CSV file with values to be replaced')27Tk().withdraw()28input_csv_file = filedialog.askopenfilename(filetypes=[('CSV files', '.csv')])29Tk().withdraw()30messagebox.showinfo(title='CSV file selected', message="The CSV file selected is '%s'" % (input_csv_file))31# Run only if a file is selected32if input_csv_file_with_map != "":33    # Read the input CSV34    input_csv_file_lines = read_csv_file(input_csv_file)35    # Generate the output36    output_csv_file_lines = replace_csv_values(input_csv_file_lines, mapping_dictionary_array, add_new_column_if_match_is_missing=True)37    # Write the output file onto the input file   38    write_csv_file(output_csv_file_lines, input_csv_file)39    # Success40    Tk().withdraw()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
