Best Python code snippet using autotest_python
test_postgres.py
Source:test_postgres.py  
...19        table_name))20    columns = cursor.fetchall()21    assert (not columns and not expected_column_tuples) \22           or set(columns) == expected_column_tuples23def get_count_sql(table_name):24    return 'SELECT count(*) FROM "public"."{}"'.format(table_name)25def get_pk_key(pks, obj, subrecord=False):26    pk_parts = []27    for pk in pks:28        pk_parts.append(str(obj[pk]))29    if subrecord:30        for key, value in obj.items():31            if key[:11] == '_sdc_level_':32                pk_parts.append(str(value))33    return ':'.join(pk_parts)34def flatten_record(old_obj, subtables, subpks, new_obj=None, current_path=None, level=0):35    if not new_obj:36        new_obj = {}37    for prop, value in old_obj.items():38        if current_path:39            next_path = current_path + '__' + prop40        else:41            next_path = prop42        if isinstance(value, dict):43            flatten_record(value, subtables, subpks, new_obj=new_obj, current_path=next_path, level=level)44        elif isinstance(value, list):45            if next_path not in subtables:46                subtables[next_path] = []47            row_index = 048            for item in value:49                new_subobj = {}50                for key, value in subpks.items():51                    new_subobj[key] = value52                new_subpks = subpks.copy()53                new_subobj[singer_stream.singer.LEVEL_FMT.format(level)] = row_index54                new_subpks[singer_stream.singer.LEVEL_FMT.format(level)] = row_index55                subtables[next_path].append(flatten_record(item,56                                                           subtables,57                                                           new_subpks,58                                                           new_obj=new_subobj,59                                                           level=level + 1))60                row_index += 161        else:62            new_obj[next_path] = value63    return new_obj64def assert_record(a, b, subtables, subpks):65    a_flat = flatten_record(a, subtables, subpks)66    for prop, value in a_flat.items():67        if value is None:68            if prop in b:69                assert b[prop] == None70        elif isinstance(b[prop], datetime):71            assert value == b[prop].isoformat()[:19]72        else:73            assert value == b[prop]74def assert_records(conn, records, table_name, pks, match_pks=False):75    if not isinstance(pks, list):76        pks = [pks]77    with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:78        cur.execute("set timezone='UTC';")79        cur.execute('SELECT * FROM {}'.format(table_name))80        persisted_records_raw = cur.fetchall()81        persisted_records = {}82        for persisted_record in persisted_records_raw:83            pk = get_pk_key(pks, persisted_record)84            persisted_records[pk] = persisted_record85        subtables = {}86        records_pks = []87        for record in records:88            pk = get_pk_key(pks, record)89            records_pks.append(pk)90            persisted_record = persisted_records[pk]91            subpks = {}92            for pk in pks:93                subpks[singer_stream.singer.SOURCE_PK_PREFIX + pk] = persisted_record[pk]94            assert_record(record, persisted_record, subtables, subpks)95        if match_pks:96            assert sorted(list(persisted_records.keys())) == sorted(records_pks)97        sub_pks = list(map(lambda pk: singer_stream.singer.SOURCE_PK_PREFIX + pk, pks))98        for subtable_name, items in subtables.items():99            cur.execute('SELECT * FROM {}'.format(100                table_name + '__' + subtable_name))101            persisted_records_raw = cur.fetchall()102            persisted_records = {}103            for persisted_record in persisted_records_raw:104                pk = get_pk_key(sub_pks, persisted_record, subrecord=True)105                persisted_records[pk] = persisted_record106            subtables = {}107            records_pks = []108            for record in items:109                pk = get_pk_key(sub_pks, record, subrecord=True)110                records_pks.append(pk)111                persisted_record = persisted_records[pk]112                assert_record(record, persisted_record, subtables, subpks)113            assert len(subtables.values()) == 0114            if match_pks:115                assert sorted(list(persisted_records.keys())) == sorted(records_pks)116def assert_column_indexed(conn, table_name, column_name):117    with conn.cursor() as cur:118        cur.execute('''119            SELECT t.relname AS table_name, i.relname AS index_name, a.attname AS column_name120            FROM pg_class t, pg_class i, pg_index ix, pg_attribute a121            WHERE t.oid = ix.indrelid AND i.oid = ix.indexrelid AND a.attrelid = t.oid122            AND a.attnum = ANY(ix.indkey) AND t.relkind = 'r'123            AND t.relname = '{table_name}' AND a.attname = '{column_name}'124        '''.format(125            table_name=table_name,126            column_name=column_name))127        assert len(cur.fetchall()) > 0128def test_loading__invalid__configuration__schema(db_cleanup):129    stream = CatStream(1)130    stream.schema = deepcopy(stream.schema)131    stream.schema['schema']['type'] = 'invalid type for a JSON Schema'132    with pytest.raises(TargetError, match=r'.*invalid JSON Schema instance.*'):133        main(CONFIG, input_stream=stream)134def test_loading__invalid__default_null_value__non_nullable_column(db_cleanup):135    class NullDefaultCatStream(CatStream):136        def generate_record(self):137            record = CatStream.generate_record(self)138            record['name'] = postgres.RESERVED_NULL_DEFAULT139            return record140    with pytest.raises(postgres.PostgresError, match=r'.*NotNullViolation.*'):141        main(CONFIG, input_stream=NullDefaultCatStream(20))142def test_loading__schema_version_0_gets_migrated_to_2(db_cleanup):143    main(CONFIG, input_stream=CatStream(100))144    with psycopg2.connect(**TEST_DB) as conn:145        with conn.cursor() as cur:146            target = postgres.PostgresTarget(conn)147            metadata = target._get_table_metadata(cur, 'cats')148            metadata.pop('schema_version')149            metadata.pop('path')150            metadata['table_mappings'] = [{'from': ['cats', 'adoption', 'immunizations'],151                                           'to': 'cats__adoption__immunizations'}]152            target._set_table_metadata(cur, 'cats', metadata)153            metadata = target._get_table_metadata(cur, 'cats__adoption__immunizations')154            metadata.pop('schema_version')155            metadata.pop('path')156            target._set_table_metadata(cur, 'cats__adoption__immunizations', metadata)157    main(CONFIG, input_stream=CatStream(100))158    with psycopg2.connect(**TEST_DB) as conn:159        with conn.cursor() as cur:160            target = postgres.PostgresTarget(conn)161            metadata = target._get_table_metadata(cur, 'cats')162            assert metadata['schema_version'] == 2163            assert metadata['path']164            assert not metadata.get('table_mappings')165            metadata = target._get_table_metadata(cur, 'cats__adoption__immunizations')166            assert metadata['schema_version'] == 2167            assert metadata['path']168            assert not metadata.get('table_mappings')169def test_loading__simple(db_cleanup):170    stream = CatStream(100)171    main(CONFIG, input_stream=stream)172    with psycopg2.connect(**TEST_DB) as conn:173        with conn.cursor() as cur:174            assert_columns_equal(cur,175                                 'cats',176                                 {177                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),178                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),179                                     ('_sdc_sequence', 'bigint', 'YES'),180                                     ('_sdc_table_version', 'bigint', 'YES'),181                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),182                                     ('adoption__was_foster', 'boolean', 'YES'),183                                     ('age', 'bigint', 'YES'),184                                     ('id', 'bigint', 'NO'),185                                     ('name', 'text', 'NO'),186                                     ('bio', 'text', 'NO'),187                                     ('paw_size', 'bigint', 'NO'),188                                     ('paw_colour', 'text', 'NO'),189                                     ('flea_check_complete', 'boolean', 'NO'),190                                     ('pattern', 'text', 'YES')191                                 })192            assert_columns_equal(cur,193                                 'cats__adoption__immunizations',194                                 {195                                     ('_sdc_level_0_id', 'bigint', 'NO'),196                                     ('_sdc_sequence', 'bigint', 'YES'),197                                     ('_sdc_source_key_id', 'bigint', 'NO'),198                                     ('date_administered', 'timestamp with time zone', 'YES'),199                                     ('type', 'text', 'YES')200                                 })201            cur.execute(get_count_sql('cats'))202            assert cur.fetchone()[0] == 100203        for record in stream.records:204            record['paw_size'] = 314159205            record['paw_colour'] = ''206            record['flea_check_complete'] = False207        assert_records(conn, stream.records, 'cats', 'id')208        assert_column_indexed(conn, 'cats', '_sdc_sequence')209        assert_column_indexed(conn, 'cats', 'id')210        assert_column_indexed(conn, 'cats__adoption__immunizations', '_sdc_sequence')211        assert_column_indexed(conn, 'cats__adoption__immunizations', '_sdc_level_0_id')212def test_loading__simple__allOf(db_cleanup):213    stream = CatStream(100)214    stream.schema = deepcopy(stream.schema)215    name = stream.schema['schema']['properties']['name']216    stream.schema['schema']['properties']['name'] = {217        'allOf': [218            name,219            {'maxLength': 100000000}220            ]221    }222    adoption = stream.schema['schema']['properties']['adoption']['properties']223    stream.schema['schema']['properties']['adoption'] = {224        'allOf': [225            {226                'type': ['object', 'null'],227                'properties': {228                    'adopted_on': adoption['adopted_on'],229                    'immunizations': adoption['immunizations'],230                }231            },232            {233                'properties': {234                    'was_foster': adoption['was_foster']235                }236            }237        ]238    }239    main(CONFIG, input_stream=stream)240    with psycopg2.connect(**TEST_DB) as conn:241        with conn.cursor() as cur:242            assert_columns_equal(cur,243                                 'cats',244                                 {245                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),246                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),247                                     ('_sdc_sequence', 'bigint', 'YES'),248                                     ('_sdc_table_version', 'bigint', 'YES'),249                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),250                                     ('adoption__was_foster', 'boolean', 'YES'),251                                     ('age', 'bigint', 'YES'),252                                     ('id', 'bigint', 'NO'),253                                     ('name', 'text', 'NO'),254                                     ('bio', 'text', 'NO'),255                                     ('paw_size', 'bigint', 'NO'),256                                     ('paw_colour', 'text', 'NO'),257                                     ('flea_check_complete', 'boolean', 'NO'),258                                     ('pattern', 'text', 'YES')259                                 })260            assert_columns_equal(cur,261                                 'cats__adoption__immunizations',262                                 {263                                     ('_sdc_level_0_id', 'bigint', 'NO'),264                                     ('_sdc_sequence', 'bigint', 'YES'),265                                     ('_sdc_source_key_id', 'bigint', 'NO'),266                                     ('date_administered', 'timestamp with time zone', 'YES'),267                                     ('type', 'text', 'YES')268                                 })269            cur.execute(get_count_sql('cats'))270            assert cur.fetchone()[0] == 100271        for record in stream.records:272            record['paw_size'] = 314159273            record['paw_colour'] = ''274            record['flea_check_complete'] = False275        assert_records(conn, stream.records, 'cats', 'id')276def test_loading__simple__anyOf(db_cleanup):277    stream = CatStream(100)278    stream.schema = deepcopy(stream.schema)279    adoption_props = stream.schema['schema']['properties']['adoption']['properties']280    adoption_props['adopted_on'] = {281        "anyOf": [282            {283                "type": "string",284                "format": "date-time"285            },286            {"type": ["string", "null"]}]}287    main(CONFIG, input_stream=stream)288    with psycopg2.connect(**TEST_DB) as conn:289        with conn.cursor() as cur:290            assert_columns_equal(cur,291                                 'cats',292                                 {293                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),294                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),295                                     ('_sdc_sequence', 'bigint', 'YES'),296                                     ('_sdc_table_version', 'bigint', 'YES'),297                                     ('adoption__adopted_on__t', 'timestamp with time zone', 'YES'),298                                     ('adoption__adopted_on__s', 'text', 'YES'),299                                     ('adoption__was_foster', 'boolean', 'YES'),300                                     ('age', 'bigint', 'YES'),301                                     ('id', 'bigint', 'NO'),302                                     ('name', 'text', 'NO'),303                                     ('bio', 'text', 'NO'),304                                     ('paw_size', 'bigint', 'NO'),305                                     ('paw_colour', 'text', 'NO'),306                                     ('flea_check_complete', 'boolean', 'NO'),307                                     ('pattern', 'text', 'YES')308                                 })309            assert_columns_equal(cur,310                                 'cats__adoption__immunizations',311                                 {312                                     ('_sdc_level_0_id', 'bigint', 'NO'),313                                     ('_sdc_sequence', 'bigint', 'YES'),314                                     ('_sdc_source_key_id', 'bigint', 'NO'),315                                     ('date_administered', 'timestamp with time zone', 'YES'),316                                     ('type', 'text', 'YES')317                                 })318            cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}').format(319                sql.Identifier('adoption__adopted_on__t'),320                sql.Identifier('adoption__adopted_on__s'),321                sql.Identifier('adoption__was_foster'),322                sql.Identifier('cats')323            ))324            persisted_records = cur.fetchall()325            ## Assert that the split columns correctly persisted all datetime data326            assert 100 == len(persisted_records)327            assert 100 == len([x for x in persisted_records if x[1] is None])328            assert len([x for x in persisted_records if x[2] is not None]) \329                == len([x for x in persisted_records if x[0] is not None])330def test_loading__empty(db_cleanup):331    stream = CatStream(0)332    main(CONFIG, input_stream=stream)333    with psycopg2.connect(**TEST_DB) as conn:334        with conn.cursor() as cur:335            cur.execute(336                sql.SQL('''337            SELECT EXISTS(338              SELECT 1339              FROM information_schema.tables340              WHERE table_schema = {}341                AND table_name = {}342            );343            ''').format(344                    sql.Literal('public'),345                    sql.Literal('cats')))346            assert not cur.fetchone()[0]347def test_loading__empty__enabled_config(db_cleanup):348    config = CONFIG.copy()349    config['persist_empty_tables'] = True350    stream = CatStream(0)351    main(config, input_stream=stream)352    with psycopg2.connect(**TEST_DB) as conn:353        with conn.cursor() as cur:354            assert_columns_equal(cur,355                                 'cats',356                                 {357                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),358                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),359                                     ('_sdc_sequence', 'bigint', 'YES'),360                                     ('_sdc_table_version', 'bigint', 'YES'),361                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),362                                     ('adoption__was_foster', 'boolean', 'YES'),363                                     ('age', 'bigint', 'YES'),364                                     ('id', 'bigint', 'NO'),365                                     ('name', 'text', 'NO'),366                                     ('bio', 'text', 'NO'),367                                     ('paw_size', 'bigint', 'NO'),368                                     ('paw_colour', 'text', 'NO'),369                                     ('flea_check_complete', 'boolean', 'NO'),370                                     ('pattern', 'text', 'YES')371                                 })372            assert_columns_equal(cur,373                                 'cats__adoption__immunizations',374                                 {375                                     ('_sdc_level_0_id', 'bigint', 'NO'),376                                     ('_sdc_sequence', 'bigint', 'YES'),377                                     ('_sdc_source_key_id', 'bigint', 'NO'),378                                     ('date_administered', 'timestamp with time zone', 'YES'),379                                     ('type', 'text', 'YES')380                                 })381            cur.execute(get_count_sql('cats'))382            assert cur.fetchone()[0] == 0383def test_loading__empty__enabled_config_with_messages_for_only_one_stream(db_cleanup):384    config = CONFIG.copy()385    config['max_batch_rows'] = 20386    config['batch_detection_threshold'] = 1387    config['persist_empty_tables'] = True388    cat_rows = list(CatStream(100))389    dog_rows = list(DogStream(0))390    # Simulate one stream that yields a lot of records with another that yields no records, and ensure that only the first391    # needs to be flushed before any state messages are emitted392    def test_stream():393        yield cat_rows[0]394        yield dog_rows[0]395        for row in cat_rows[slice(1, 5)]:396            yield row397        yield json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}})398        for row in cat_rows[slice(6, 25)]:399            yield row400        yield json.dumps({'type': 'STATE', 'value': {'test': 'state-2'}})401    main(config, input_stream=test_stream())402    with psycopg2.connect(**TEST_DB) as conn:403        with conn.cursor() as cur:404            assert_columns_equal(cur,405                                 'cats',406                                 {407                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),408                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),409                                     ('_sdc_sequence', 'bigint', 'YES'),410                                     ('_sdc_table_version', 'bigint', 'YES'),411                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),412                                     ('adoption__was_foster', 'boolean', 'YES'),413                                     ('age', 'bigint', 'YES'),414                                     ('id', 'bigint', 'NO'),415                                     ('name', 'text', 'NO'),416                                     ('bio', 'text', 'NO'),417                                     ('paw_size', 'bigint', 'NO'),418                                     ('paw_colour', 'text', 'NO'),419                                     ('flea_check_complete', 'boolean', 'NO'),420                                     ('pattern', 'text', 'YES')421                                 })422            assert_columns_equal(cur,423                                 'dogs',424                                 {425                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),426                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),427                                     ('_sdc_sequence', 'bigint', 'YES'),428                                     ('_sdc_table_version', 'bigint', 'YES'),429                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),430                                     ('adoption__was_foster', 'boolean', 'YES'),431                                     ('age', 'bigint', 'YES'),432                                     ('id', 'bigint', 'NO'),433                                     ('name', 'text', 'NO'),434                                     ('bio', 'text', 'NO'),435                                     ('paw_size', 'bigint', 'NO'),436                                     ('paw_colour', 'text', 'NO'),437                                     ('flea_check_complete', 'boolean', 'NO'),438                                     ('pattern', 'text', 'YES')439                                 })440            cur.execute(get_count_sql('cats'))441            assert cur.fetchone()[0] == 23442            cur.execute(get_count_sql('dogs'))443            assert cur.fetchone()[0] == 0444## TODO: Complex types defaulted445# def test_loading__default__complex_type(db_cleanup):446#     main(CONFIG, input_stream=NestedStream(10))447#448#     with psycopg2.connect(**TEST_DB) as conn:449#         with conn.cursor() as cur:450#             cur.execute(get_count_sql('root'))451#             assert 10 == cur.fetchone()[0]452#453#             cur.execute(get_count_sql('root__array_scalar_defaulted'))454#             assert 100 == cur.fetchone()[0]455def test_loading__nested_tables(db_cleanup):456    main(CONFIG, input_stream=NestedStream(10))457    with psycopg2.connect(**TEST_DB) as conn:458        with conn.cursor() as cur:459            cur.execute(get_count_sql('root'))460            assert 10 == cur.fetchone()[0]461            cur.execute(get_count_sql('root__array_scalar'))462            assert 50 == cur.fetchone()[0]463            cur.execute(464                get_count_sql('root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar'[:63]))465            assert 50 == cur.fetchone()[0]466            cur.execute(get_count_sql('root__array_of_array'))467            assert 20 == cur.fetchone()[0]468            cur.execute(get_count_sql('root__array_of_array___sdc_value'))469            assert 80 == cur.fetchone()[0]470            cur.execute(get_count_sql('root__array_of_array___sdc_value___sdc_value'))471            assert 200 == cur.fetchone()[0]472            assert_columns_equal(cur,473                                 'root',474                                 {475                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),476                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),477                                     ('_sdc_sequence', 'bigint', 'YES'),478                                     ('_sdc_table_version', 'bigint', 'YES'),479                                     ('id', 'bigint', 'NO'),480                                     ('null', 'bigint', 'YES'),481                                     ('nested_null__null', 'bigint', 'YES'),482                                     ('object_of_object_0__object_of_object_1__object_of_object_2__a', 'bigint', 'NO'),483                                     ('object_of_object_0__object_of_object_1__object_of_object_2__b', 'bigint', 'NO'),484                                     ('object_of_object_0__object_of_object_1__object_of_object_2__c', 'bigint', 'NO')485                                 })486            assert_columns_equal(cur,487                                 'root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar'[:63],488                                 {489                                     ('_sdc_sequence', 'bigint', 'YES'),490                                     ('_sdc_source_key_id', 'bigint', 'NO'),491                                     ('_sdc_level_0_id', 'bigint', 'NO'),492                                     ('_sdc_value', 'boolean', 'NO')493                                 })494            assert_columns_equal(cur,495                                 'root__array_of_array',496                                 {497                                     ('_sdc_sequence', 'bigint', 'YES'),498                                     ('_sdc_source_key_id', 'bigint', 'NO'),499                                     ('_sdc_level_0_id', 'bigint', 'NO')500                                 })501            assert_columns_equal(cur,502                                 'root__array_of_array___sdc_value',503                                 {504                                     ('_sdc_sequence', 'bigint', 'YES'),505                                     ('_sdc_source_key_id', 'bigint', 'NO'),506                                     ('_sdc_level_0_id', 'bigint', 'NO'),507                                     ('_sdc_level_1_id', 'bigint', 'NO')508                                 })509            assert_columns_equal(cur,510                                 'root__array_of_array___sdc_value___sdc_value',511                                 {512                                     ('_sdc_sequence', 'bigint', 'YES'),513                                     ('_sdc_source_key_id', 'bigint', 'NO'),514                                     ('_sdc_level_0_id', 'bigint', 'NO'),515                                     ('_sdc_level_1_id', 'bigint', 'NO'),516                                     ('_sdc_level_2_id', 'bigint', 'NO'),517                                     ('_sdc_value', 'bigint', 'NO')518                                 })519def test_loading__new_non_null_column(db_cleanup):520    cat_count = 50521    main(CONFIG, input_stream=CatStream(cat_count))522    class NonNullStream(CatStream):523        def generate_record(self):524            record = CatStream.generate_record(self)525            record['id'] = record['id'] + cat_count526            return record527    non_null_stream = NonNullStream(cat_count)528    non_null_stream.schema = deepcopy(non_null_stream.schema)529    non_null_stream.schema['schema']['properties']['paw_toe_count'] = {'type': 'integer',530                                                                       'default': 5}531    main(CONFIG, input_stream=non_null_stream)532    with psycopg2.connect(**TEST_DB) as conn:533        with conn.cursor() as cur:534            assert_columns_equal(cur,535                                 'cats',536                                 {537                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),538                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),539                                     ('_sdc_sequence', 'bigint', 'YES'),540                                     ('_sdc_table_version', 'bigint', 'YES'),541                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),542                                     ('adoption__was_foster', 'boolean', 'YES'),543                                     ('age', 'bigint', 'YES'),544                                     ('id', 'bigint', 'NO'),545                                     ('name', 'text', 'NO'),546                                     ('bio', 'text', 'NO'),547                                     ('paw_size', 'bigint', 'NO'),548                                     ('paw_colour', 'text', 'NO'),549                                     ('paw_toe_count', 'bigint', 'YES'),550                                     ('flea_check_complete', 'boolean', 'NO'),551                                     ('pattern', 'text', 'YES')552                                 })553            cur.execute(sql.SQL('SELECT {}, {} FROM {}').format(554                sql.Identifier('id'),555                sql.Identifier('paw_toe_count'),556                sql.Identifier('cats')557            ))558            persisted_records = cur.fetchall()559            ## Assert that the split columns before/after new non-null data560            assert 2 * cat_count == len(persisted_records)561            assert cat_count == len([x for x in persisted_records if x[1] is None])562            assert cat_count == len([x for x in persisted_records if x[1] is not None])563def test_loading__column_type_change(db_cleanup):564    cat_count = 20565    main(CONFIG, input_stream=CatStream(cat_count))566    with psycopg2.connect(**TEST_DB) as conn:567        with conn.cursor() as cur:568            assert_columns_equal(cur,569                                 'cats',570                                 {571                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),572                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),573                                     ('_sdc_sequence', 'bigint', 'YES'),574                                     ('_sdc_table_version', 'bigint', 'YES'),575                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),576                                     ('adoption__was_foster', 'boolean', 'YES'),577                                     ('age', 'bigint', 'YES'),578                                     ('id', 'bigint', 'NO'),579                                     ('name', 'text', 'NO'),580                                     ('bio', 'text', 'NO'),581                                     ('paw_size', 'bigint', 'NO'),582                                     ('paw_colour', 'text', 'NO'),583                                     ('flea_check_complete', 'boolean', 'NO'),584                                     ('pattern', 'text', 'YES')585                                 })586            cur.execute(sql.SQL('SELECT {} FROM {}').format(587                sql.Identifier('name'),588                sql.Identifier('cats')589            ))590            persisted_records = cur.fetchall()591            ## Assert that the original data is present592            assert cat_count == len(persisted_records)593            assert cat_count == len([x for x in persisted_records if x[0] is not None])594    class NameDateTimeCatStream(CatStream):595        def generate_record(self):596            record = CatStream.generate_record(self)597            record['id'] = record['id'] + cat_count598            record['name'] = '2001-01-01 01:01:01.0001+01:01'599            return record600    stream = NameDateTimeCatStream(cat_count)601    stream.schema = deepcopy(stream.schema)602    stream.schema['schema']['properties']['name'] = {'type': 'string',603                                                     'format': 'date-time'}604    main(CONFIG, input_stream=stream)605    with psycopg2.connect(**TEST_DB) as conn:606        with conn.cursor() as cur:607            assert_columns_equal(cur,608                                 'cats',609                                 {610                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),611                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),612                                     ('_sdc_sequence', 'bigint', 'YES'),613                                     ('_sdc_table_version', 'bigint', 'YES'),614                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),615                                     ('adoption__was_foster', 'boolean', 'YES'),616                                     ('age', 'bigint', 'YES'),617                                     ('id', 'bigint', 'NO'),618                                     ('name__s', 'text', 'YES'),619                                     ('name__t', 'timestamp with time zone', 'YES'),620                                     ('bio', 'text', 'NO'),621                                     ('paw_size', 'bigint', 'NO'),622                                     ('paw_colour', 'text', 'NO'),623                                     ('flea_check_complete', 'boolean', 'NO'),624                                     ('pattern', 'text', 'YES')625                                 })626            cur.execute(sql.SQL('SELECT {}, {} FROM {}').format(627                sql.Identifier('name__s'),628                sql.Identifier('name__t'),629                sql.Identifier('cats')630            ))631            persisted_records = cur.fetchall()632            ## Assert that the split columns migrated data/persisted new data633            assert 2 * cat_count == len(persisted_records)634            assert cat_count == len([x for x in persisted_records if x[0] is not None])635            assert cat_count == len([x for x in persisted_records if x[1] is not None])636            assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])637    class NameBooleanCatStream(CatStream):638        def generate_record(self):639            record = CatStream.generate_record(self)640            record['id'] = record['id'] + (2 * cat_count)641            record['name'] = False642            return record643    stream = NameBooleanCatStream(cat_count)644    stream.schema = deepcopy(stream.schema)645    stream.schema['schema']['properties']['name'] = {'type': 'boolean'}646    main(CONFIG, input_stream=stream)647    with psycopg2.connect(**TEST_DB) as conn:648        with conn.cursor() as cur:649            assert_columns_equal(cur,650                                 'cats',651                                 {652                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),653                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),654                                     ('_sdc_sequence', 'bigint', 'YES'),655                                     ('_sdc_table_version', 'bigint', 'YES'),656                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),657                                     ('adoption__was_foster', 'boolean', 'YES'),658                                     ('age', 'bigint', 'YES'),659                                     ('id', 'bigint', 'NO'),660                                     ('name__s', 'text', 'YES'),661                                     ('name__t', 'timestamp with time zone', 'YES'),662                                     ('name__b', 'boolean', 'YES'),663                                     ('bio', 'text', 'NO'),664                                     ('paw_size', 'bigint', 'NO'),665                                     ('paw_colour', 'text', 'NO'),666                                     ('flea_check_complete', 'boolean', 'NO'),667                                     ('pattern', 'text', 'YES')668                                 })669            cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}').format(670                sql.Identifier('name__s'),671                sql.Identifier('name__t'),672                sql.Identifier('name__b'),673                sql.Identifier('cats')674            ))675            persisted_records = cur.fetchall()676            ## Assert that the split columns migrated data/persisted new data677            assert 3 * cat_count == len(persisted_records)678            assert cat_count == len([x for x in persisted_records if x[0] is not None])679            assert cat_count == len([x for x in persisted_records if x[1] is not None])680            assert cat_count == len([x for x in persisted_records if x[2] is not None])681            assert 0 == len(682                [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])683    class NameIntegerCatStream(CatStream):684        def generate_record(self):685            record = CatStream.generate_record(self)686            record['id'] = record['id'] + (3 * cat_count)687            record['name'] = 314688            return record689    stream = NameIntegerCatStream(cat_count)690    stream.schema = deepcopy(stream.schema)691    stream.schema['schema']['properties']['name'] = {'type': 'integer'}692    main(CONFIG, input_stream=stream)693    with psycopg2.connect(**TEST_DB) as conn:694        with conn.cursor() as cur:695            assert_columns_equal(cur,696                                 'cats',697                                 {698                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),699                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),700                                     ('_sdc_sequence', 'bigint', 'YES'),701                                     ('_sdc_table_version', 'bigint', 'YES'),702                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),703                                     ('adoption__was_foster', 'boolean', 'YES'),704                                     ('age', 'bigint', 'YES'),705                                     ('id', 'bigint', 'NO'),706                                     ('name__s', 'text', 'YES'),707                                     ('name__t', 'timestamp with time zone', 'YES'),708                                     ('name__b', 'boolean', 'YES'),709                                     ('name__i', 'bigint', 'YES'),710                                     ('bio', 'text', 'NO'),711                                     ('paw_size', 'bigint', 'NO'),712                                     ('paw_colour', 'text', 'NO'),713                                     ('flea_check_complete', 'boolean', 'NO'),714                                     ('pattern', 'text', 'YES')715                                 })716            cur.execute(sql.SQL('SELECT {}, {}, {}, {} FROM {}').format(717                sql.Identifier('name__s'),718                sql.Identifier('name__t'),719                sql.Identifier('name__b'),720                sql.Identifier('name__i'),721                sql.Identifier('cats')722            ))723            persisted_records = cur.fetchall()724            ## Assert that the split columns migrated data/persisted new data725            assert 4 * cat_count == len(persisted_records)726            assert cat_count == len([x for x in persisted_records if x[0] is not None])727            assert cat_count == len([x for x in persisted_records if x[1] is not None])728            assert cat_count == len([x for x in persisted_records if x[2] is not None])729            assert cat_count == len([x for x in persisted_records if x[3] is not None])730            assert 0 == len(731                [x for x in persisted_records if732                 x[0] is not None and x[1] is not None and x[2] is not None and x[3] is not None])733            assert 0 == len(734                [x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None and x[3] is None])735def test_loading__column_type_change__generative(db_cleanup):736    insert_count = 20737    repeats_to_perform = 5738    literal_types_remaining = set(['integer', 'number', 'boolean', 'string', 'date-time'])739    repeats_performed = 0740    while repeats_performed < repeats_to_perform or literal_types_remaining:741        stream = TypeChangeStream(insert_count, repeats_performed * insert_count)742        repeats_performed += 1743        if stream.changing_literal_type in literal_types_remaining:744            literal_types_remaining.remove(stream.changing_literal_type)745        main(CONFIG, input_stream=stream)746    with psycopg2.connect(**TEST_DB) as conn:747        with conn.cursor() as cur:748            assert_columns_equal(cur,749                                 'root',750                                 {751                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),752                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),753                                     ('_sdc_sequence', 'bigint', 'YES'),754                                     ('_sdc_table_version', 'bigint', 'YES'),755                                     ('id', 'bigint', 'NO'),756                                     ('changing_literal_type__s', 'text', 'YES'),757                                     ('changing_literal_type__t', 'timestamp with time zone', 'YES'),758                                     ('changing_literal_type__b', 'boolean', 'YES'),759                                     ('changing_literal_type__i', 'bigint', 'YES'),760                                     ('changing_literal_type__f', 'double precision', 'YES')761                                 })762            cur.execute(sql.SQL('SELECT {}, {}, {}, {}, {} FROM {}').format(763                sql.Identifier('changing_literal_type__s'),764                sql.Identifier('changing_literal_type__t'),765                sql.Identifier('changing_literal_type__b'),766                sql.Identifier('changing_literal_type__i'),767                sql.Identifier('changing_literal_type__f'),768                sql.Identifier('root')769            ))770            persisted_records = cur.fetchall()771            ## Assert that the split columns migrated data/persisted new data772            assert repeats_performed * insert_count == len(persisted_records)773            assert insert_count <= len([x for x in persisted_records if x[0] is not None])774            assert insert_count <= len([x for x in persisted_records if x[1] is not None])775            assert insert_count <= len([x for x in persisted_records if x[2] is not None])776            ## Integers are valid Numbers, so sometimes a Number can be placed into an existing Integer column777            assert (2 * insert_count) \778                   <= len([x for x in persisted_records if x[3] is not None]) \779                   + len([x for x in persisted_records if x[4] is not None])780            assert 0 == len(781                [x for x in persisted_records782                 if x[0] is not None783                 and x[1] is not None784                 and x[2] is not None785                 and x[3] is not None786                 and x[4] is not None])787            assert 0 == len(788                [x for x in persisted_records789                 if x[0] is None790                 and x[1] is None791                 and x[2] is None792                 and x[3] is None793                 and x[4] is None])794def test_loading__column_type_change__nullable(db_cleanup):795    cat_count = 20796    main(CONFIG, input_stream=CatStream(cat_count))797    with psycopg2.connect(**TEST_DB) as conn:798        with conn.cursor() as cur:799            assert_columns_equal(cur,800                                 'cats',801                                 {802                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),803                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),804                                     ('_sdc_sequence', 'bigint', 'YES'),805                                     ('_sdc_table_version', 'bigint', 'YES'),806                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),807                                     ('adoption__was_foster', 'boolean', 'YES'),808                                     ('age', 'bigint', 'YES'),809                                     ('id', 'bigint', 'NO'),810                                     ('name', 'text', 'NO'),811                                     ('bio', 'text', 'NO'),812                                     ('paw_size', 'bigint', 'NO'),813                                     ('paw_colour', 'text', 'NO'),814                                     ('flea_check_complete', 'boolean', 'NO'),815                                     ('pattern', 'text', 'YES')816                                 })817            cur.execute(sql.SQL('SELECT {} FROM {}').format(818                sql.Identifier('name'),819                sql.Identifier('cats')820            ))821            persisted_records = cur.fetchall()822            ## Assert that the original data is present823            assert cat_count == len(persisted_records)824            assert cat_count == len([x for x in persisted_records if x[0] is not None])825    class NameNullCatStream(CatStream):826        def generate_record(self):827            record = CatStream.generate_record(self)828            record['id'] = record['id'] + cat_count829            record['name'] = None830            return record831    stream = NameNullCatStream(cat_count)832    stream.schema = deepcopy(stream.schema)833    stream.schema['schema']['properties']['name'] = json_schema.make_nullable(834        stream.schema['schema']['properties']['name'])835    main(CONFIG, input_stream=stream)836    with psycopg2.connect(**TEST_DB) as conn:837        with conn.cursor() as cur:838            assert_columns_equal(cur,839                                 'cats',840                                 {841                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),842                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),843                                     ('_sdc_sequence', 'bigint', 'YES'),844                                     ('_sdc_table_version', 'bigint', 'YES'),845                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),846                                     ('adoption__was_foster', 'boolean', 'YES'),847                                     ('age', 'bigint', 'YES'),848                                     ('id', 'bigint', 'NO'),849                                     ('name', 'text', 'YES'),850                                     ('bio', 'text', 'NO'),851                                     ('paw_size', 'bigint', 'NO'),852                                     ('paw_colour', 'text', 'NO'),853                                     ('flea_check_complete', 'boolean', 'NO'),854                                     ('pattern', 'text', 'YES')855                                 })856            cur.execute(sql.SQL('SELECT {} FROM {}').format(857                sql.Identifier('name'),858                sql.Identifier('cats')859            ))860            persisted_records = cur.fetchall()861            ## Assert that the column is has migrated data862            assert 2 * cat_count == len(persisted_records)863            assert cat_count == len([x for x in persisted_records if x[0] is not None])864            assert cat_count == len([x for x in persisted_records if x[0] is None])865    class NameNonNullCatStream(CatStream):866        def generate_record(self):867            record = CatStream.generate_record(self)868            record['id'] = record['id'] + 2 * cat_count869            return record870    main(CONFIG, input_stream=NameNonNullCatStream(cat_count))871    with psycopg2.connect(**TEST_DB) as conn:872        with conn.cursor() as cur:873            assert_columns_equal(cur,874                                 'cats',875                                 {876                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),877                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),878                                     ('_sdc_sequence', 'bigint', 'YES'),879                                     ('_sdc_table_version', 'bigint', 'YES'),880                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),881                                     ('adoption__was_foster', 'boolean', 'YES'),882                                     ('age', 'bigint', 'YES'),883                                     ('id', 'bigint', 'NO'),884                                     ('name', 'text', 'YES'),885                                     ('bio', 'text', 'NO'),886                                     ('paw_size', 'bigint', 'NO'),887                                     ('paw_colour', 'text', 'NO'),888                                     ('flea_check_complete', 'boolean', 'NO'),889                                     ('pattern', 'text', 'YES')890                                 })891            cur.execute(sql.SQL('SELECT {} FROM {}').format(892                sql.Identifier('name'),893                sql.Identifier('cats')894            ))895            persisted_records = cur.fetchall()896            ## Assert that the column is has migrated data897            assert 3 * cat_count == len(persisted_records)898            assert 2 * cat_count == len([x for x in persisted_records if x[0] is not None])899            assert cat_count == len([x for x in persisted_records if x[0] is None])900def test_loading__column_type_change__nullable__missing_from_schema(db_cleanup):901    cat_count = 20902    main(CONFIG, input_stream=CatStream(cat_count))903    with psycopg2.connect(**TEST_DB) as conn:904        with conn.cursor() as cur:905            assert_columns_equal(cur,906                                 'cats',907                                 {908                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),909                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),910                                     ('_sdc_sequence', 'bigint', 'YES'),911                                     ('_sdc_table_version', 'bigint', 'YES'),912                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),913                                     ('adoption__was_foster', 'boolean', 'YES'),914                                     ('age', 'bigint', 'YES'),915                                     ('id', 'bigint', 'NO'),916                                     ('name', 'text', 'NO'),917                                     ('bio', 'text', 'NO'),918                                     ('paw_size', 'bigint', 'NO'),919                                     ('paw_colour', 'text', 'NO'),920                                     ('flea_check_complete', 'boolean', 'NO'),921                                     ('pattern', 'text', 'YES')922                                 })923            cur.execute(sql.SQL('SELECT {} FROM {}').format(924                sql.Identifier('name'),925                sql.Identifier('cats')926            ))927            persisted_records = cur.fetchall()928            ## Assert that the original data is present929            assert cat_count == len(persisted_records)930            assert cat_count == len([x for x in persisted_records if x[0] is not None])931    class NameMissingCatStream(CatStream):932        def generate_record(self):933            record = CatStream.generate_record(self)934            record['id'] = record['id'] + cat_count935            del record['name']936            return record937    stream = NameMissingCatStream(cat_count)938    stream.schema = deepcopy(stream.schema)939    del stream.schema['schema']['properties']['name']940    main(CONFIG, input_stream=stream)941    with psycopg2.connect(**TEST_DB) as conn:942        with conn.cursor() as cur:943            assert_columns_equal(cur,944                                 'cats',945                                 {946                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),947                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),948                                     ('_sdc_sequence', 'bigint', 'YES'),949                                     ('_sdc_table_version', 'bigint', 'YES'),950                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),951                                     ('adoption__was_foster', 'boolean', 'YES'),952                                     ('age', 'bigint', 'YES'),953                                     ('id', 'bigint', 'NO'),954                                     ('name', 'text', 'YES'),955                                     ('bio', 'text', 'NO'),956                                     ('paw_size', 'bigint', 'NO'),957                                     ('paw_colour', 'text', 'NO'),958                                     ('flea_check_complete', 'boolean', 'NO'),959                                     ('pattern', 'text', 'YES')960                                 })961            cur.execute(sql.SQL('SELECT {} FROM {}').format(962                sql.Identifier('name'),963                sql.Identifier('cats')964            ))965            persisted_records = cur.fetchall()966            ## Assert that the column is has migrated data967            assert 2 * cat_count == len(persisted_records)968            assert cat_count == len([x for x in persisted_records if x[0] is not None])969            assert cat_count == len([x for x in persisted_records if x[0] is None])970def test_loading__multi_types_columns(db_cleanup):971    stream_count = 50972    main(CONFIG, input_stream=MultiTypeStream(stream_count))973    with psycopg2.connect(**TEST_DB) as conn:974        with conn.cursor() as cur:975            assert_columns_equal(cur,976                                 'root',977                                 {978                                     ('_sdc_primary_key', 'text', 'NO'),979                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),980                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),981                                     ('_sdc_sequence', 'bigint', 'YES'),982                                     ('_sdc_table_version', 'bigint', 'YES'),983                                     ('every_type__i', 'bigint', 'YES'),984                                     ('every_type__f', 'double precision', 'YES'),985                                     ('every_type__b', 'boolean', 'YES'),986                                     ('every_type__t', 'timestamp with time zone', 'YES'),987                                     ('every_type__i__1', 'bigint', 'YES'),988                                     ('every_type__f__1', 'double precision', 'YES'),989                                     ('every_type__b__1', 'boolean', 'YES'),990                                     ('number_which_only_comes_as_integer', 'double precision', 'NO')991                                 })992            assert_columns_equal(cur,993                                 'root__every_type',994                                 {995                                     ('_sdc_source_key__sdc_primary_key', 'text', 'NO'),996                                     ('_sdc_level_0_id', 'bigint', 'NO'),997                                     ('_sdc_sequence', 'bigint', 'YES'),998                                     ('_sdc_value', 'bigint', 'NO'),999                                 })1000            cur.execute(sql.SQL('SELECT {} FROM {}').format(1001                sql.Identifier('number_which_only_comes_as_integer'),1002                sql.Identifier('root')1003            ))1004            persisted_records = cur.fetchall()1005            ## Assert that the column is has migrated data1006            assert stream_count == len(persisted_records)1007            assert stream_count == len([x for x in persisted_records if isinstance(x[0], float)])1008def test_loading__invalid__table_name__stream(db_cleanup):1009    def invalid_stream_named(stream_name):1010        stream = CatStream(100)1011        stream.stream = stream_name1012        stream.schema = deepcopy(stream.schema)1013        stream.schema['stream'] = stream_name1014        main(CONFIG, input_stream=stream)1015    invalid_stream_named('')1016    invalid_stream_named('x' * 1000)1017    invalid_stream_named('INVALID_name')1018    invalid_stream_named('a!!!invalid_name')1019    borderline_length_stream_name = 'x' * 611020    stream = CatStream(100, version=1)1021    stream.stream = borderline_length_stream_name1022    stream.schema = deepcopy(stream.schema)1023    stream.schema['stream'] = borderline_length_stream_name1024    main(CONFIG, input_stream=stream)1025    stream = CatStream(100, version=10)1026    stream.stream = borderline_length_stream_name1027    stream.schema = deepcopy(stream.schema)1028    stream.schema['stream'] = borderline_length_stream_name1029    main(CONFIG, input_stream=stream)1030def test_loading__table_name__stream__simple(db_cleanup):1031    def assert_tables_equal(cursor, expected_table_names):1032        cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")1033        tables = []1034        for table in cursor.fetchall():1035            tables.append(table[0])1036        assert (not tables and not expected_table_names) \1037               or set(tables) == expected_table_names1038    stream_name = "C@ts"1039    stream = CatStream(100)1040    stream.stream = stream_name1041    stream.schema = deepcopy(stream.schema)1042    stream.schema['stream'] = stream_name1043    main(CONFIG, input_stream=stream)1044    with psycopg2.connect(**TEST_DB) as conn:1045        with conn.cursor() as cur:1046            cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")1047            tables = []1048            for table in cur.fetchall():1049                tables.append(table[0])1050            assert {'c_ts', 'c_ts__adoption__immunizations'} == set(tables)1051            assert_columns_equal(cur,1052                                 'c_ts',1053                                 {1054                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1055                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1056                                     ('_sdc_sequence', 'bigint', 'YES'),1057                                     ('_sdc_table_version', 'bigint', 'YES'),1058                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1059                                     ('adoption__was_foster', 'boolean', 'YES'),1060                                     ('age', 'bigint', 'YES'),1061                                     ('id', 'bigint', 'NO'),1062                                     ('name', 'text', 'NO'),1063                                     ('bio', 'text', 'NO'),1064                                     ('paw_size', 'bigint', 'NO'),1065                                     ('paw_colour', 'text', 'NO'),1066                                     ('flea_check_complete', 'boolean', 'NO'),1067                                     ('pattern', 'text', 'YES')1068                                 })1069            assert_columns_equal(cur,1070                                 'c_ts__adoption__immunizations',1071                                 {1072                                     ('_sdc_level_0_id', 'bigint', 'NO'),1073                                     ('_sdc_sequence', 'bigint', 'YES'),1074                                     ('_sdc_source_key_id', 'bigint', 'NO'),1075                                     ('date_administered', 'timestamp with time zone', 'YES'),1076                                     ('type', 'text', 'YES')1077                                 })1078            cur.execute(get_count_sql('c_ts'))1079            assert cur.fetchone()[0] == 1001080        for record in stream.records:1081            record['paw_size'] = 3141591082            record['paw_colour'] = ''1083            record['flea_check_complete'] = False1084        assert_records(conn, stream.records, 'c_ts', 'id')1085def test_loading__invalid__table_name__nested(db_cleanup):1086    cat_count = 201087    sub_table_name = 'immunizations'1088    invalid_name = 'INValID!NON{conflicting'1089    class InvalidNameSubTableCatStream(CatStream):1090        immunizations_count = 01091        def generate_record(self):1092            record = CatStream.generate_record(self)1093            if record.get('adoption', False):1094                self.immunizations_count += len(record['adoption'][sub_table_name])1095                record['adoption'][invalid_name] = record['adoption'][sub_table_name]1096            return record1097    stream = InvalidNameSubTableCatStream(cat_count)1098    stream.schema = deepcopy(stream.schema)1099    stream.schema['schema']['properties']['adoption']['properties'][invalid_name] = \1100        stream.schema['schema']['properties']['adoption']['properties'][sub_table_name]1101    main(CONFIG, input_stream=stream)1102    immunizations_count = stream.immunizations_count1103    invalid_name_count = stream.immunizations_count1104    conflicting_name = sub_table_name.upper()1105    class ConflictingNameSubTableCatStream(CatStream):1106        immunizations_count = 01107        def generate_record(self):1108            record = CatStream.generate_record(self)1109            if record.get('adoption', False):1110                self.immunizations_count += len(record['adoption'][sub_table_name])1111                record['adoption'][conflicting_name] = record['adoption'][sub_table_name]1112            record['id'] = record['id'] + cat_count1113            return record1114    stream = ConflictingNameSubTableCatStream(cat_count)1115    stream.schema = deepcopy(stream.schema)1116    stream.schema['schema']['properties']['adoption']['properties'][conflicting_name] = \1117        stream.schema['schema']['properties']['adoption']['properties'][sub_table_name]1118    main(CONFIG, input_stream=stream)1119    immunizations_count += stream.immunizations_count1120    conflicting_name_count = stream.immunizations_count1121    with psycopg2.connect(**TEST_DB) as conn:1122        with conn.cursor() as cur:1123            assert_columns_equal(cur,1124                                 'cats',1125                                 {1126                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1127                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1128                                     ('_sdc_sequence', 'bigint', 'YES'),1129                                     ('_sdc_table_version', 'bigint', 'YES'),1130                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1131                                     ('adoption__was_foster', 'boolean', 'YES'),1132                                     ('age', 'bigint', 'YES'),1133                                     ('id', 'bigint', 'NO'),1134                                     ('name', 'text', 'NO'),1135                                     ('bio', 'text', 'NO'),1136                                     ('paw_size', 'bigint', 'NO'),1137                                     ('paw_colour', 'text', 'NO'),1138                                     ('flea_check_complete', 'boolean', 'NO'),1139                                     ('pattern', 'text', 'YES')1140                                 })1141            cur.execute(get_count_sql('cats'))1142            assert 2 * cat_count == cur.fetchone()[0]1143            cur.execute(get_count_sql('cats__adoption__immunizations'))1144            assert immunizations_count == cur.fetchone()[0]1145            cur.execute(get_count_sql('cats__adoption__invalid_non_conflicting'))1146            assert invalid_name_count == cur.fetchone()[0]1147            cur.execute(get_count_sql('cats__adoption__immunizations__1'))1148            assert conflicting_name_count == cur.fetchone()[0]1149def test_loading__invalid_column_name(db_cleanup):1150    non_alphanumeric_stream = CatStream(100)1151    non_alphanumeric_stream.schema = deepcopy(non_alphanumeric_stream.schema)1152    non_alphanumeric_stream.schema['schema']['properties']['!!!invalid_name'] = \1153        non_alphanumeric_stream.schema['schema']['properties']['age']1154    main(CONFIG, input_stream=non_alphanumeric_stream)1155    non_lowercase_stream = CatStream(100)1156    non_lowercase_stream.schema = deepcopy(non_lowercase_stream.schema)1157    non_lowercase_stream.schema['schema']['properties']['INVALID_name'] = \1158        non_lowercase_stream.schema['schema']['properties']['age']1159    main(CONFIG, input_stream=non_lowercase_stream)1160    duplicate_non_lowercase_stream_1 = CatStream(100)1161    duplicate_non_lowercase_stream_1.schema = deepcopy(duplicate_non_lowercase_stream_1.schema)1162    duplicate_non_lowercase_stream_1.schema['schema']['properties']['invalid!NAME'] = \1163        duplicate_non_lowercase_stream_1.schema['schema']['properties']['age']1164    main(CONFIG, input_stream=duplicate_non_lowercase_stream_1)1165    duplicate_non_lowercase_stream_2 = CatStream(100)1166    duplicate_non_lowercase_stream_2.schema = deepcopy(duplicate_non_lowercase_stream_2.schema)1167    duplicate_non_lowercase_stream_2.schema['schema']['properties']['invalid#NAME'] = \1168        duplicate_non_lowercase_stream_2.schema['schema']['properties']['age']1169    main(CONFIG, input_stream=duplicate_non_lowercase_stream_2)1170    duplicate_non_lowercase_stream_3 = CatStream(100)1171    duplicate_non_lowercase_stream_3.schema = deepcopy(duplicate_non_lowercase_stream_3.schema)1172    duplicate_non_lowercase_stream_3.schema['schema']['properties']['invalid%NAmE'] = \1173        duplicate_non_lowercase_stream_3.schema['schema']['properties']['age']1174    main(CONFIG, input_stream=duplicate_non_lowercase_stream_3)1175    name_too_long_stream = CatStream(100)1176    name_too_long_stream.schema = deepcopy(name_too_long_stream.schema)1177    name_too_long_stream.schema['schema']['properties']['x' * 1000] = \1178        name_too_long_stream.schema['schema']['properties']['age']1179    main(CONFIG, input_stream=name_too_long_stream)1180    duplicate_name_too_long_stream = CatStream(100)1181    duplicate_name_too_long_stream.schema = deepcopy(duplicate_name_too_long_stream.schema)1182    duplicate_name_too_long_stream.schema['schema']['properties']['x' * 100] = \1183        duplicate_name_too_long_stream.schema['schema']['properties']['age']1184    main(CONFIG, input_stream=duplicate_name_too_long_stream)1185    with psycopg2.connect(**TEST_DB) as conn:1186        with conn.cursor() as cur:1187            assert_columns_equal(cur,1188                                 'cats',1189                                 {1190                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1191                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1192                                     ('_sdc_sequence', 'bigint', 'YES'),1193                                     ('_sdc_table_version', 'bigint', 'YES'),1194                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1195                                     ('adoption__was_foster', 'boolean', 'YES'),1196                                     ('age', 'bigint', 'YES'),1197                                     ('id', 'bigint', 'NO'),1198                                     ('name', 'text', 'NO'),1199                                     ('bio', 'text', 'NO'),1200                                     ('paw_size', 'bigint', 'NO'),1201                                     ('paw_colour', 'text', 'NO'),1202                                     ('___invalid_name', 'bigint', 'YES'),1203                                     ('invalid_name', 'bigint', 'YES'),1204                                     ('invalid_name__1', 'bigint', 'YES'),1205                                     ('invalid_name__2', 'bigint', 'YES'),1206                                     ('invalid_name__3', 'bigint', 'YES'),1207                                     ('x' * 63, 'bigint', 'YES'),1208                                     (('x' * 60 + '__1'), 'bigint', 'YES'),1209                                     ('flea_check_complete', 'boolean', 'NO'),1210                                     ('pattern', 'text', 'YES')1211                                 })1212def test_loading__invalid_column_name__pk(db_cleanup):1213    def setup(count):1214        class Stream(CatStream):1215            def generate_record(self):1216                record = CatStream.generate_record(self)1217                record['ID'] = record['id']1218                record.pop('id')1219                return record1220        stream = Stream(count)1221        stream.schema = deepcopy(stream.schema)1222        stream.schema['schema']['properties']['ID'] = \1223            stream.schema['schema']['properties']['id']1224        stream.schema['key_properties'] = ['ID']1225        stream.schema['schema']['properties'].pop('id')1226        return stream1227    main(CONFIG, input_stream=setup(100))1228    main(CONFIG, input_stream=setup(200))1229    with psycopg2.connect(**TEST_DB) as conn:1230        with conn.cursor() as cur:1231            assert_columns_equal(cur,1232                                 'cats',1233                                 {1234                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1235                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1236                                     ('_sdc_sequence', 'bigint', 'YES'),1237                                     ('_sdc_table_version', 'bigint', 'YES'),1238                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1239                                     ('adoption__was_foster', 'boolean', 'YES'),1240                                     ('age', 'bigint', 'YES'),1241                                     ('id', 'bigint', 'NO'),1242                                     ('name', 'text', 'NO'),1243                                     ('bio', 'text', 'NO'),1244                                     ('paw_size', 'bigint', 'NO'),1245                                     ('paw_colour', 'text', 'NO'),1246                                     ('flea_check_complete', 'boolean', 'NO'),1247                                     ('pattern', 'text', 'YES')1248                                 })1249def test_loading__invalid_column_name__duplicate_name_handling(db_cleanup):1250    for i in range(101):1251        name_too_long_stream = CatStream(100)1252        name_too_long_stream.schema = deepcopy(name_too_long_stream.schema)1253        name_too_long_stream.schema['schema']['properties']['x' * (100 + i)] = \1254            name_too_long_stream.schema['schema']['properties']['age']1255        main(CONFIG, input_stream=name_too_long_stream)1256    expected_columns = {1257        ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1258        ('_sdc_received_at', 'timestamp with time zone', 'YES'),1259        ('_sdc_sequence', 'bigint', 'YES'),1260        ('_sdc_table_version', 'bigint', 'YES'),1261        ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1262        ('adoption__was_foster', 'boolean', 'YES'),1263        ('age', 'bigint', 'YES'),1264        ('id', 'bigint', 'NO'),1265        ('name', 'text', 'NO'),1266        ('bio', 'text', 'NO'),1267        ('paw_size', 'bigint', 'NO'),1268        ('paw_colour', 'text', 'NO'),1269        ('x' * 63, 'bigint', 'YES'),1270        (('x' * 58 + '__100'), 'bigint', 'YES'),1271        ('flea_check_complete', 'boolean', 'NO'),1272        ('pattern', 'text', 'YES')1273    }1274    for i in range(1, 10):1275        expected_columns.add((('x' * 60 + '__' + str(i)), 'bigint', 'YES'))1276    for i in range(10, 100):1277        expected_columns.add((('x' * 59 + '__' + str(i)), 'bigint', 'YES'))1278    with psycopg2.connect(**TEST_DB) as conn:1279        with conn.cursor() as cur:1280            assert_columns_equal(cur, 'cats', expected_columns)1281def test_loading__invalid_column_name__column_type_change(db_cleanup):1282    invalid_column_name = 'INVALID!name'1283    cat_count = 201284    stream = CatStream(cat_count)1285    stream.schema = deepcopy(stream.schema)1286    stream.schema['schema']['properties'][invalid_column_name] = \1287        stream.schema['schema']['properties']['paw_colour']1288    main(CONFIG, input_stream=stream)1289    with psycopg2.connect(**TEST_DB) as conn:1290        with conn.cursor() as cur:1291            assert_columns_equal(cur,1292                                 'cats',1293                                 {1294                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1295                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1296                                     ('_sdc_sequence', 'bigint', 'YES'),1297                                     ('_sdc_table_version', 'bigint', 'YES'),1298                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1299                                     ('adoption__was_foster', 'boolean', 'YES'),1300                                     ('age', 'bigint', 'YES'),1301                                     ('id', 'bigint', 'NO'),1302                                     ('name', 'text', 'NO'),1303                                     ('bio', 'text', 'NO'),1304                                     ('paw_size', 'bigint', 'NO'),1305                                     ('paw_colour', 'text', 'NO'),1306                                     ('invalid_name', 'text', 'NO'),1307                                     ('flea_check_complete', 'boolean', 'NO'),1308                                     ('pattern', 'text', 'YES')1309                                 })1310            cur.execute(sql.SQL('SELECT {} FROM {}').format(1311                sql.Identifier('invalid_name'),1312                sql.Identifier('cats')1313            ))1314            persisted_records = cur.fetchall()1315            ## Assert that the original data is present1316            assert cat_count == len(persisted_records)1317            assert cat_count == len([x for x in persisted_records if x[0] is not None])1318    class BooleanCatStream(CatStream):1319        def generate_record(self):1320            record = CatStream.generate_record(self)1321            record['id'] = record['id'] + cat_count1322            record[invalid_column_name] = False1323            return record1324    stream = BooleanCatStream(cat_count)1325    stream.schema = deepcopy(stream.schema)1326    stream.schema['schema']['properties'][invalid_column_name] = {'type': 'boolean'}1327    main(CONFIG, input_stream=stream)1328    with psycopg2.connect(**TEST_DB) as conn:1329        with conn.cursor() as cur:1330            assert_columns_equal(cur,1331                                 'cats',1332                                 {1333                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1334                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1335                                     ('_sdc_sequence', 'bigint', 'YES'),1336                                     ('_sdc_table_version', 'bigint', 'YES'),1337                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1338                                     ('adoption__was_foster', 'boolean', 'YES'),1339                                     ('age', 'bigint', 'YES'),1340                                     ('id', 'bigint', 'NO'),1341                                     ('name', 'text', 'NO'),1342                                     ('bio', 'text', 'NO'),1343                                     ('paw_size', 'bigint', 'NO'),1344                                     ('paw_colour', 'text', 'NO'),1345                                     ('invalid_name__s', 'text', 'YES'),1346                                     ('invalid_name__b', 'boolean', 'YES'),1347                                     ('flea_check_complete', 'boolean', 'NO'),1348                                     ('pattern', 'text', 'YES')1349                                 })1350            cur.execute(sql.SQL('SELECT {}, {} FROM {}').format(1351                sql.Identifier('invalid_name__s'),1352                sql.Identifier('invalid_name__b'),1353                sql.Identifier('cats')1354            ))1355            persisted_records = cur.fetchall()1356            ## Assert that the split columns migrated data/persisted new data1357            assert 2 * cat_count == len(persisted_records)1358            assert cat_count == len([x for x in persisted_records if x[0] is not None])1359            assert cat_count == len([x for x in persisted_records if x[1] is not None])1360            assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])1361    class IntegerCatStream(CatStream):1362        def generate_record(self):1363            record = CatStream.generate_record(self)1364            record['id'] = record['id'] + (2 * cat_count)1365            record[invalid_column_name] = 3141366            return record1367    stream = IntegerCatStream(cat_count)1368    stream.schema = deepcopy(stream.schema)1369    stream.schema['schema']['properties'][invalid_column_name] = {'type': 'integer'}1370    main(CONFIG, input_stream=stream)1371    with psycopg2.connect(**TEST_DB) as conn:1372        with conn.cursor() as cur:1373            assert_columns_equal(cur,1374                                 'cats',1375                                 {1376                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1377                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1378                                     ('_sdc_sequence', 'bigint', 'YES'),1379                                     ('_sdc_table_version', 'bigint', 'YES'),1380                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1381                                     ('adoption__was_foster', 'boolean', 'YES'),1382                                     ('age', 'bigint', 'YES'),1383                                     ('id', 'bigint', 'NO'),1384                                     ('name', 'text', 'NO'),1385                                     ('bio', 'text', 'NO'),1386                                     ('paw_size', 'bigint', 'NO'),1387                                     ('paw_colour', 'text', 'NO'),1388                                     ('invalid_name__s', 'text', 'YES'),1389                                     ('invalid_name__b', 'boolean', 'YES'),1390                                     ('invalid_name__i', 'bigint', 'YES'),1391                                     ('flea_check_complete', 'boolean', 'NO'),1392                                     ('pattern', 'text', 'YES')1393                                 })1394            cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}').format(1395                sql.Identifier('invalid_name__s'),1396                sql.Identifier('invalid_name__b'),1397                sql.Identifier('invalid_name__i'),1398                sql.Identifier('cats')1399            ))1400            persisted_records = cur.fetchall()1401            ## Assert that the split columns migrated data/persisted new data1402            assert 3 * cat_count == len(persisted_records)1403            assert cat_count == len([x for x in persisted_records if x[0] is not None])1404            assert cat_count == len([x for x in persisted_records if x[1] is not None])1405            assert cat_count == len([x for x in persisted_records if x[2] is not None])1406            assert 0 == len(1407                [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])1408            assert 0 == len([x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None])1409def test_loading__column_type_change__pks__same_resulting_type(db_cleanup):1410    stream = CatStream(20)1411    stream.schema = deepcopy(stream.schema)1412    stream.schema['schema']['properties']['id'] = {'type': ['integer', 'null']}1413    main(CONFIG, input_stream=stream)1414    stream = CatStream(20)1415    stream.schema = deepcopy(stream.schema)1416    stream.schema['schema']['properties']['id'] = {'type': ['null', 'integer']}1417    main(CONFIG, input_stream=stream)1418def test_loading__invalid__column_type_change__pks(db_cleanup):1419    main(CONFIG, input_stream=CatStream(20))1420    class StringIdCatStream(CatStream):1421        def generate_record(self):1422            record = CatStream.generate_record(self)1423            record['id'] = str(record['id'])1424            return record1425    stream = StringIdCatStream(20)1426    stream.schema = deepcopy(stream.schema)1427    stream.schema['schema']['properties']['id'] = {'type': 'string'}1428    with pytest.raises(postgres.PostgresError, match=r'.*key_properties. type change detected'):1429        main(CONFIG, input_stream=stream)1430def test_loading__invalid__column_type_change__pks__nullable(db_cleanup):1431    main(CONFIG, input_stream=CatStream(20))1432    stream = CatStream(20)1433    stream.schema = deepcopy(stream.schema)1434    stream.schema['schema']['properties']['id'] = json_schema.make_nullable(stream.schema['schema']['properties']['id'])1435    with pytest.raises(postgres.PostgresError, match=r'.*key_properties. type change detected'):1436        main(CONFIG, input_stream=stream)1437def test_upsert(db_cleanup):1438    stream = CatStream(100)1439    main(CONFIG, input_stream=stream)1440    with psycopg2.connect(**TEST_DB) as conn:1441        with conn.cursor() as cur:1442            cur.execute(get_count_sql('cats'))1443            assert cur.fetchone()[0] == 1001444            assert_columns_equal(cur,1445                                 'cats',1446                                 {1447                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1448                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1449                                     ('_sdc_sequence', 'bigint', 'YES'),1450                                     ('_sdc_table_version', 'bigint', 'YES'),1451                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1452                                     ('adoption__was_foster', 'boolean', 'YES'),1453                                     ('age', 'bigint', 'YES'),1454                                     ('id', 'bigint', 'NO'),1455                                     ('name', 'text', 'NO'),1456                                     ('bio', 'text', 'NO'),1457                                     ('paw_size', 'bigint', 'NO'),1458                                     ('paw_colour', 'text', 'NO'),1459                                     ('flea_check_complete', 'boolean', 'NO'),1460                                     ('pattern', 'text', 'YES')1461                                 })1462            assert_columns_equal(cur,1463                                 'cats__adoption__immunizations',1464                                 {1465                                     ('_sdc_level_0_id', 'bigint', 'NO'),1466                                     ('_sdc_sequence', 'bigint', 'YES'),1467                                     ('_sdc_source_key_id', 'bigint', 'NO'),1468                                     ('date_administered', 'timestamp with time zone', 'YES'),1469                                     ('type', 'text', 'YES')1470                                 })1471        assert_records(conn, stream.records, 'cats', 'id')1472    stream = CatStream(100)1473    main(CONFIG, input_stream=stream)1474    with psycopg2.connect(**TEST_DB) as conn:1475        with conn.cursor() as cur:1476            cur.execute(get_count_sql('cats'))1477            assert cur.fetchone()[0] == 1001478            assert_columns_equal(cur,1479                                 'cats',1480                                 {1481                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1482                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1483                                     ('_sdc_sequence', 'bigint', 'YES'),1484                                     ('_sdc_table_version', 'bigint', 'YES'),1485                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1486                                     ('adoption__was_foster', 'boolean', 'YES'),1487                                     ('age', 'bigint', 'YES'),1488                                     ('id', 'bigint', 'NO'),1489                                     ('name', 'text', 'NO'),1490                                     ('bio', 'text', 'NO'),1491                                     ('paw_size', 'bigint', 'NO'),1492                                     ('paw_colour', 'text', 'NO'),1493                                     ('flea_check_complete', 'boolean', 'NO'),1494                                     ('pattern', 'text', 'YES')1495                                 })1496            assert_columns_equal(cur,1497                                 'cats__adoption__immunizations',1498                                 {1499                                     ('_sdc_level_0_id', 'bigint', 'NO'),1500                                     ('_sdc_sequence', 'bigint', 'YES'),1501                                     ('_sdc_source_key_id', 'bigint', 'NO'),1502                                     ('date_administered', 'timestamp with time zone', 'YES'),1503                                     ('type', 'text', 'YES')1504                                 })1505        assert_records(conn, stream.records, 'cats', 'id')1506    stream = CatStream(200)1507    main(CONFIG, input_stream=stream)1508    with psycopg2.connect(**TEST_DB) as conn:1509        with conn.cursor() as cur:1510            cur.execute(get_count_sql('cats'))1511            assert cur.fetchone()[0] == 2001512            assert_columns_equal(cur,1513                                 'cats',1514                                 {1515                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1516                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1517                                     ('_sdc_sequence', 'bigint', 'YES'),1518                                     ('_sdc_table_version', 'bigint', 'YES'),1519                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1520                                     ('adoption__was_foster', 'boolean', 'YES'),1521                                     ('age', 'bigint', 'YES'),1522                                     ('id', 'bigint', 'NO'),1523                                     ('name', 'text', 'NO'),1524                                     ('bio', 'text', 'NO'),1525                                     ('paw_size', 'bigint', 'NO'),1526                                     ('paw_colour', 'text', 'NO'),1527                                     ('flea_check_complete', 'boolean', 'NO'),1528                                     ('pattern', 'text', 'YES')1529                                 })1530            assert_columns_equal(cur,1531                                 'cats__adoption__immunizations',1532                                 {1533                                     ('_sdc_level_0_id', 'bigint', 'NO'),1534                                     ('_sdc_sequence', 'bigint', 'YES'),1535                                     ('_sdc_source_key_id', 'bigint', 'NO'),1536                                     ('date_administered', 'timestamp with time zone', 'YES'),1537                                     ('type', 'text', 'YES')1538                                 })1539        assert_records(conn, stream.records, 'cats', 'id')1540def test_upsert__invalid__primary_key_change(db_cleanup):1541    stream = CatStream(100)1542    main(CONFIG, input_stream=stream)1543    stream = CatStream(100)1544    schema = deepcopy(stream.schema)1545    schema['key_properties'].append('name')1546    stream.schema = schema1547    with pytest.raises(postgres.PostgresError, match=r'.*key_properties.*'):1548        main(CONFIG, input_stream=stream)1549def test_nested_delete_on_parent(db_cleanup):1550    stream = CatStream(100, nested_count=3)1551    main(CONFIG, input_stream=stream)1552    with psycopg2.connect(**TEST_DB) as conn:1553        with conn.cursor() as cur:1554            cur.execute(get_count_sql('cats__adoption__immunizations'))1555            high_nested = cur.fetchone()[0]1556        assert_records(conn, stream.records, 'cats', 'id')1557    stream = CatStream(100, nested_count=2)1558    main(CONFIG, input_stream=stream)1559    with psycopg2.connect(**TEST_DB) as conn:1560        with conn.cursor() as cur:1561            cur.execute(get_count_sql('cats__adoption__immunizations'))1562            low_nested = cur.fetchone()[0]1563        assert_records(conn, stream.records, 'cats', 'id')1564    assert low_nested < high_nested1565def test_full_table_replication(db_cleanup):1566    stream = CatStream(110, version=0, nested_count=3)1567    main(CONFIG, input_stream=stream)1568    with psycopg2.connect(**TEST_DB) as conn:1569        with conn.cursor() as cur:1570            cur.execute(get_count_sql('cats'))1571            version_0_count = cur.fetchone()[0]1572            cur.execute(get_count_sql('cats__adoption__immunizations'))1573            version_0_sub_count = cur.fetchone()[0]1574        assert_records(conn, stream.records, 'cats', 'id', match_pks=True)1575    assert version_0_count == 1101576    assert version_0_sub_count == 3301577    stream = CatStream(100, version=1, nested_count=3)1578    main(CONFIG, input_stream=stream)1579    with psycopg2.connect(**TEST_DB) as conn:1580        with conn.cursor() as cur:1581            cur.execute(get_count_sql('cats'))1582            version_1_count = cur.fetchone()[0]1583            cur.execute(get_count_sql('cats__adoption__immunizations'))1584            version_1_sub_count = cur.fetchone()[0]1585        assert_records(conn, stream.records, 'cats', 'id', match_pks=True)1586    assert version_1_count == 1001587    assert version_1_sub_count == 3001588    stream = CatStream(120, version=2, nested_count=2)1589    main(CONFIG, input_stream=stream)1590    with psycopg2.connect(**TEST_DB) as conn:1591        with conn.cursor() as cur:1592            cur.execute(get_count_sql('cats'))1593            version_2_count = cur.fetchone()[0]1594            cur.execute(get_count_sql('cats__adoption__immunizations'))1595            version_2_sub_count = cur.fetchone()[0]1596        assert_records(conn, stream.records, 'cats', 'id', match_pks=True)1597    assert version_2_count == 1201598    assert version_2_sub_count == 2401599    ## Test that an outdated version cannot overwrite1600    stream = CatStream(314, version=1, nested_count=2)1601    main(CONFIG, input_stream=stream)1602    with psycopg2.connect(**TEST_DB) as conn:1603        with conn.cursor() as cur:1604            cur.execute(get_count_sql('cats'))1605            older_version_count = cur.fetchone()[0]1606    assert older_version_count == version_2_count1607def test_deduplication_newer_rows(db_cleanup):1608    stream = CatStream(100, nested_count=3, duplicates=2)1609    main(CONFIG, input_stream=stream)1610    with psycopg2.connect(**TEST_DB) as conn:1611        with conn.cursor() as cur:1612            cur.execute(get_count_sql('cats'))1613            table_count = cur.fetchone()[0]1614            cur.execute(get_count_sql('cats__adoption__immunizations'))1615            nested_table_count = cur.fetchone()[0]1616            cur.execute('SELECT _sdc_sequence FROM cats WHERE id in ({})'.format(1617                ','.join(map(str, stream.duplicate_pks_used))))1618            dup_cat_records = cur.fetchall()1619    assert stream.record_message_count == 1021620    assert table_count == 1001621    assert nested_table_count == 3001622    for record in dup_cat_records:1623        assert record[0] == stream.sequence + 2001624def test_deduplication_older_rows(db_cleanup):1625    stream = CatStream(100, nested_count=2, duplicates=2, duplicate_sequence_delta=-100)1626    main(CONFIG, input_stream=stream)1627    with psycopg2.connect(**TEST_DB) as conn:1628        with conn.cursor() as cur:1629            cur.execute(get_count_sql('cats'))1630            table_count = cur.fetchone()[0]1631            cur.execute(get_count_sql('cats__adoption__immunizations'))1632            nested_table_count = cur.fetchone()[0]1633            cur.execute('SELECT _sdc_sequence FROM cats WHERE id in ({})'.format(1634                ','.join(map(str, stream.duplicate_pks_used))))1635            dup_cat_records = cur.fetchall()1636    assert stream.record_message_count == 1021637    assert table_count == 1001638    assert nested_table_count == 2001639    for record in dup_cat_records:1640        assert record[0] == stream.sequence1641def test_deduplication_existing_new_rows(db_cleanup):1642    stream = CatStream(100, nested_count=2)1643    main(CONFIG, input_stream=stream)1644    original_sequence = stream.sequence1645    stream = CatStream(100,1646                       nested_count=2,1647                       sequence=original_sequence - 20)1648    main(CONFIG, input_stream=stream)1649    with psycopg2.connect(**TEST_DB) as conn:1650        with conn.cursor() as cur:1651            cur.execute(get_count_sql('cats'))1652            table_count = cur.fetchone()[0]1653            cur.execute(get_count_sql('cats__adoption__immunizations'))1654            nested_table_count = cur.fetchone()[0]1655            cur.execute('SELECT DISTINCT _sdc_sequence FROM cats')1656            sequences = cur.fetchall()1657    assert table_count == 1001658    assert nested_table_count == 2001659    assert len(sequences) == 11660    assert sequences[0][0] == original_sequence1661def test_multiple_batches_upsert(db_cleanup):1662    config = CONFIG.copy()1663    config['max_batch_rows'] = 201664    config['batch_detection_threshold'] = 51665    stream = CatStream(100, nested_count=2)1666    main(config, input_stream=stream)1667    with psycopg2.connect(**TEST_DB) as conn:1668        with conn.cursor() as cur:1669            cur.execute(get_count_sql('cats'))1670            assert cur.fetchone()[0] == 1001671            cur.execute(get_count_sql('cats__adoption__immunizations'))1672            assert cur.fetchone()[0] == 2001673        assert_records(conn, stream.records, 'cats', 'id')1674    stream = CatStream(100, nested_count=3)1675    main(config, input_stream=stream)1676    with psycopg2.connect(**TEST_DB) as conn:1677        with conn.cursor() as cur:1678            cur.execute(get_count_sql('cats'))1679            assert cur.fetchone()[0] == 1001680            cur.execute(get_count_sql('cats__adoption__immunizations'))1681            assert cur.fetchone()[0] == 3001682        assert_records(conn, stream.records, 'cats', 'id')1683def test_multiple_batches_by_memory_upsert(db_cleanup):1684    config = CONFIG.copy()1685    config['max_batch_size'] = 10241686    config['batch_detection_threshold'] = 51687    stream = CatStream(100, nested_count=2)1688    main(config, input_stream=stream)1689    with psycopg2.connect(**TEST_DB) as conn:1690        with conn.cursor() as cur:1691            cur.execute(get_count_sql('cats'))1692            assert cur.fetchone()[0] == 1001693            cur.execute(get_count_sql('cats__adoption__immunizations'))1694            assert cur.fetchone()[0] == 2001695        assert_records(conn, stream.records, 'cats', 'id')1696    stream = CatStream(100, nested_count=3)1697    main(config, input_stream=stream)1698    with psycopg2.connect(**TEST_DB) as conn:1699        with conn.cursor() as cur:1700            cur.execute(get_count_sql('cats'))1701            assert cur.fetchone()[0] == 1001702            cur.execute(get_count_sql('cats__adoption__immunizations'))1703            assert cur.fetchone()[0] == 3001704        assert_records(conn, stream.records, 'cats', 'id')1705def test_loading__very_long_stream_name(db_cleanup):1706    stream_name = 'extremely_______________long_cats'1707    class LongCatStream(CatStream):1708        stream = stream_name1709        schema = CatStream.schema.copy()1710    LongCatStream.schema['stream'] = stream_name1711    stream = LongCatStream(100)1712    main(CONFIG, input_stream=stream)1713    with psycopg2.connect(**TEST_DB) as conn:1714        with conn.cursor() as cur:1715            assert_columns_equal(cur,1716                                 stream_name,1717                                 {1718                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1719                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),1720                                     ('_sdc_sequence', 'bigint', 'YES'),1721                                     ('_sdc_table_version', 'bigint', 'YES'),1722                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1723                                     ('adoption__was_foster', 'boolean', 'YES'),1724                                     ('age', 'bigint', 'YES'),1725                                     ('id', 'bigint', 'NO'),1726                                     ('name', 'text', 'NO'),1727                                     ('bio', 'text', 'NO'),1728                                     ('paw_size', 'bigint', 'NO'),1729                                     ('paw_colour', 'text', 'NO'),1730                                     ('flea_check_complete', 'boolean', 'NO'),1731                                     ('pattern', 'text', 'YES')1732                                 })1733            assert_columns_equal(cur,1734                                 '{}__adoption__immunizations'.format(stream_name),1735                                 {1736                                     ('_sdc_level_0_id', 'bigint', 'NO'),1737                                     ('_sdc_sequence', 'bigint', 'YES'),1738                                     ('_sdc_source_key_id', 'bigint', 'NO'),1739                                     ('date_administered', 'timestamp with time zone', 'YES'),1740                                     ('type', 'text', 'YES')1741                                 })1742            cur.execute(get_count_sql(stream_name))1743            assert cur.fetchone()[0] == 1001744        for record in stream.records:1745            record['paw_size'] = 3141591746            record['paw_colour'] = ''1747            record['flea_check_complete'] = False1748        assert_records(conn, stream.records, stream_name, 'id')1749        assert_column_indexed(conn, stream_name, '_sdc_sequence')1750        assert_column_indexed(conn, stream_name, 'id')1751        assert_column_indexed(conn, '{}__adoption__immunizations'.format(stream_name), '_sdc_sequence')1752        assert_column_indexed(conn, '{}__adoption__immunizations'.format(stream_name), '_sdc_level_0_id')1753def test_before_run_sql_is_executed_upon_construction(db_cleanup):1754    config = CONFIG.copy()1755    config['before_run_sql'] = 'CREATE TABLE before_sql_test ( code char(5) CONSTRAINT firstkey PRIMARY KEY );'1756    config['after_run_sql'] = 'CREATE TABLE after_sql_test ( code char(5) CONSTRAINT secondkey PRIMARY KEY );'...test_target_snowflake.py
Source:test_target_snowflake.py  
...19    columns = []20    for column in cursor.fetchall():21        columns.append((column[0], column[1], column[2]))22    assert set(columns) == expected_column_tuples23def get_count_sql(table_name):24    return '''25        SELECT COUNT(*) FROM {}.{}.{}26    '''.format(27        sql.identifier(CONFIG['snowflake_database']),28        sql.identifier(CONFIG['snowflake_schema']),29        sql.identifier(table_name))30def assert_count_equal(cursor, table_name, expected_count):31    cursor.execute(get_count_sql(table_name))32    assert cursor.fetchone()[0] == expected_count33def get_pk_key(pks, obj, subrecord=False):34    pk_parts = []35    for pk in pks:36        pk_parts.append(str(obj[pk]))37    if subrecord:38        for key, value in obj.items():39            if key[:11] == '_SDC_LEVEL_' or key[:11] == '_sdc_level_':40                pk_parts.append(str(value))41    return ':'.join(pk_parts)42def flatten_record(old_obj, subtables, subpks, new_obj=None, current_path=None, level=0):43    if not new_obj:44        new_obj = {}45    for prop, value in old_obj.items():46        if current_path:47            next_path = current_path + '__' + prop48        else:49            next_path = prop50        if isinstance(value, dict):51            flatten_record(value, subtables, subpks, new_obj=new_obj, current_path=next_path, level=level)52        elif isinstance(value, list):53            if next_path not in subtables:54                subtables[next_path] = []55            row_index = 056            for item in value:57                new_subobj = {}58                for key, value in subpks.items():59                    new_subobj[key.lower()] = value60                new_subpks = subpks.copy()61                new_subobj[singer_stream.SINGER_LEVEL.format(level)] = row_index62                new_subpks[singer_stream.SINGER_LEVEL.format(level)] = row_index63                subtables[next_path].append(flatten_record(item,64                                                           subtables,65                                                           new_subpks,66                                                           new_obj=new_subobj,67                                                           level=level + 1))68                row_index += 169        else:70            new_obj[next_path] = value71    return new_obj72def assert_record(a, b, subtables, subpks):73    a_flat = flatten_record(a, subtables, subpks)74    for prop, value in a_flat.items():75        canoncialized_prop = prop.upper()76        if value is None:77            if canoncialized_prop in b:78                assert b[canoncialized_prop] == None79        elif isinstance(b[canoncialized_prop], datetime):80            assert value == b[canoncialized_prop].isoformat()[:19]81        else:82            assert value == b[canoncialized_prop]83def assert_records(conn, records, table_name, pks, match_pks=False):84    if not isinstance(pks, list):85        pks = [pks]86    with conn.cursor(True) as cur:87        cur.execute("set timezone='UTC';")88        cur.execute('''89            SELECT * FROM {}.{}.{}90        '''.format(91            sql.identifier(CONFIG['snowflake_database']),92            sql.identifier(CONFIG['snowflake_schema']),93            sql.identifier(table_name)))94        persisted_records_raw = cur.fetchall()95        persisted_records = {}96        for persisted_record in persisted_records_raw:97            pk = get_pk_key(pks, persisted_record)98            persisted_records[pk] = persisted_record99        subtables = {}100        records_pks = []101        pre_canonicalized_pks = [x.lower() for x in pks]102        for record in records:103            pk = get_pk_key(pre_canonicalized_pks, record)104            records_pks.append(pk)105            persisted_record = persisted_records[pk.upper()]106            subpks = {}107            for pk in pks:108                subpks[singer_stream.SINGER_SOURCE_PK_PREFIX + pk] = persisted_record[pk]109            assert_record(record, persisted_record, subtables, subpks)110        if match_pks:111            assert sorted(list(persisted_records.keys())) == sorted(records_pks)112        sub_pks = list(map(lambda pk: singer_stream.SINGER_SOURCE_PK_PREFIX.upper() + pk, pks))113        for subtable_name, items in subtables.items():114            cur.execute('''115                SELECT * FROM {}.{}.{}116            '''.format(117                sql.identifier(CONFIG['snowflake_database']),118                sql.identifier(CONFIG['snowflake_schema']),119                sql.identifier(table_name + '__' + subtable_name.upper())))120            persisted_records_raw = cur.fetchall()121            persisted_records = {}122            for persisted_record in persisted_records_raw:123                pk = get_pk_key(sub_pks, persisted_record, subrecord=True)124                persisted_records[pk] = persisted_record125            subtables = {}126            records_pks = []127            pre_canonicalized_sub_pks = [x.lower() for x in sub_pks]128            for record in items:129                pk = get_pk_key(pre_canonicalized_sub_pks, record, subrecord=True)130                records_pks.append(pk)131                persisted_record = persisted_records[pk]132                assert_record(record, persisted_record, subtables, subpks)133            assert len(subtables.values()) == 0134            if match_pks:135                assert sorted(list(persisted_records.keys())) == sorted(records_pks)136def test_connect(db_prep):137    with connect(**TEST_DB) as connection:138        with connection.cursor() as cur:139            assert cur.execute('select 1').fetchall()140def test_loading__empty(db_prep):141    stream = CatStream(0)142def test_loading__empty__enabled_config(db_prep):143    config = CONFIG.copy()144    config['persist_empty_tables'] = True145    stream = CatStream(0)146    main(config, input_stream=stream)147    with connect(**TEST_DB) as conn:148        with conn.cursor() as cur:149            assert_columns_equal(cur,150                                 'CATS',151                                 {152                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),153                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),154                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),155                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),156                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),157                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),158                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),159                                     ('AGE', 'NUMBER', 'YES'),160                                     ('ID', 'NUMBER', 'NO'),161                                     ('NAME', 'TEXT', 'NO'),162                                     ('PAW_SIZE', 'NUMBER', 'NO'),163                                     ('PAW_COLOUR', 'TEXT', 'NO'),164                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),165                                     ('PATTERN', 'TEXT', 'YES')166                                 })167            assert_columns_equal(cur,168                                 'CATS__ADOPTION__IMMUNIZATIONS',169                                 {170                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),171                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),172                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),173                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),174                                     ('DATE_ADMINISTERED', 'TIMESTAMP_TZ', 'YES'),175                                     ('TYPE', 'TEXT', 'YES')176                                 })177            assert_count_equal(cur, 'CATS', 0)178def test_loading__empty__enabled_config__repeatability(db_prep):179    config = CONFIG.copy()180    config['persist_empty_tables'] = True181    main(config, input_stream=CatStream(0))182    main(config, input_stream=CatStream(0))183    main(config, input_stream=CatStream(0))184def test_loading__simple(db_prep):185    stream = CatStream(100)186    main(CONFIG, input_stream=stream)187    with connect(**TEST_DB) as conn:188        with conn.cursor() as cur:189            assert_columns_equal(cur,190                                 'CATS',191                                 {192                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),193                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),194                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),195                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),196                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),197                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),198                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),199                                     ('AGE', 'NUMBER', 'YES'),200                                     ('ID', 'NUMBER', 'NO'),201                                     ('NAME', 'TEXT', 'NO'),202                                     ('PAW_SIZE', 'NUMBER', 'NO'),203                                     ('PAW_COLOUR', 'TEXT', 'NO'),204                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),205                                     ('PATTERN', 'TEXT', 'YES')206                                 })207            assert_columns_equal(cur,208                                 'CATS__ADOPTION__IMMUNIZATIONS',209                                 {210                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),211                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),212                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),213                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),214                                     ('DATE_ADMINISTERED', 'TIMESTAMP_TZ', 'YES'),215                                     ('TYPE', 'TEXT', 'YES')216                                 })217            assert_count_equal(cur, 'CATS', 100)218        for record in stream.records:219            record['paw_size'] = 314159220            record['paw_colour'] = ''221            record['flea_check_complete'] = False222        assert_records(conn, stream.records, 'CATS', 'ID')223def test_loading__simple__s3_staging(db_prep):224    stream = CatStream(100)225    main(S3_CONFIG, input_stream=stream)226    with connect(**TEST_DB) as conn:227        with conn.cursor() as cur:228            assert_columns_equal(cur,229                                 'CATS',230                                 {231                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),232                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),233                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),234                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),235                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),236                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),237                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),238                                     ('AGE', 'NUMBER', 'YES'),239                                     ('ID', 'NUMBER', 'NO'),240                                     ('NAME', 'TEXT', 'NO'),241                                     ('PAW_SIZE', 'NUMBER', 'NO'),242                                     ('PAW_COLOUR', 'TEXT', 'NO'),243                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),244                                     ('PATTERN', 'TEXT', 'YES')245                                 })246            assert_columns_equal(cur,247                                 'CATS__ADOPTION__IMMUNIZATIONS',248                                 {249                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),250                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),251                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),252                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),253                                     ('DATE_ADMINISTERED', 'TIMESTAMP_TZ', 'YES'),254                                     ('TYPE', 'TEXT', 'YES')255                                 })256            assert_count_equal(cur, 'CATS', 100)257        for record in stream.records:258            record['paw_size'] = 314159259            record['paw_colour'] = ''260            record['flea_check_complete'] = False261        assert_records(conn, stream.records, 'CATS', 'ID')262def test_loading__nested_tables(db_prep):263    main(CONFIG, input_stream=NestedStream(10))264    with connect(**TEST_DB) as conn:265        with conn.cursor() as cur:266            assert_count_equal(cur, 'ROOT', 10)267            assert_count_equal(cur, 'ROOT__ARRAY_SCALAR', 50)268            assert_count_equal(269                cur,270                'ROOT__OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__ARRAY_SCALAR',271                50)272            assert_count_equal(cur, 'ROOT__ARRAY_OF_ARRAY', 20)273            assert_count_equal(cur, 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE', 80)274            assert_count_equal(cur, 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE___SDC_VALUE', 200)275            assert_columns_equal(cur,276                                 'ROOT',277                                 {278                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),279                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),280                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),281                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),282                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),283                                     ('ID', 'NUMBER', 'NO'),284                                     ('NULL', 'NUMBER', 'YES'),285                                     ('NESTED_NULL__NULL', 'NUMBER', 'YES'),286                                     ('OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__A', 'NUMBER', 'NO'),287                                     ('OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__B', 'NUMBER', 'NO'),288                                     ('OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__C', 'NUMBER', 'NO')289                                 })290            assert_columns_equal(cur,291                                 'ROOT__OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__ARRAY_SCALAR',292                                 {293                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),294                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),295                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),296                                     ('_SDC_VALUE', 'BOOLEAN', 'NO'),297                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),298                                 })299            assert_columns_equal(cur,300                                 'ROOT__ARRAY_OF_ARRAY',301                                 {302                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),303                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),304                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),305                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),306                                 })307            assert_columns_equal(cur,308                                 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE',309                                 {310                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),311                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),312                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),313                                     ('_SDC_LEVEL_1_ID', 'NUMBER', 'NO'),314                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),315                                 })316            assert_columns_equal(cur,317                                 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE___SDC_VALUE',318                                 {319                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),320                                     ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),321                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),322                                     ('_SDC_LEVEL_1_ID', 'NUMBER', 'NO'),323                                     ('_SDC_LEVEL_2_ID', 'NUMBER', 'NO'),324                                     ('_SDC_VALUE', 'NUMBER', 'NO'),325                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),326                                 })327def test_loading__new_non_null_column(db_prep):328    cat_count = 50329    main(CONFIG, input_stream=CatStream(cat_count))330    class NonNullStream(CatStream):331        def generate_record(self):332            record = CatStream.generate_record(self)333            record['id'] = record['id'] + cat_count334            return record335    non_null_stream = NonNullStream(cat_count)336    non_null_stream.schema = deepcopy(non_null_stream.schema)337    non_null_stream.schema['schema']['properties']['paw_toe_count'] = {'type': 'integer',338                                                                       'default': 5}339    main(CONFIG, input_stream=non_null_stream)340    with connect(**TEST_DB) as conn:341        with conn.cursor() as cur:342            assert_columns_equal(cur,343                                 'CATS',344                                 {345                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),346                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),347                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),348                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),349                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),350                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),351                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),352                                     ('AGE', 'NUMBER', 'YES'),353                                     ('ID', 'NUMBER', 'NO'),354                                     ('NAME', 'TEXT', 'NO'),355                                     ('PAW_SIZE', 'NUMBER', 'NO'),356                                     ('PAW_COLOUR', 'TEXT', 'NO'),357                                     ('PAW_TOE_COUNT', 'NUMBER', 'YES'),358                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),359                                     ('PATTERN', 'TEXT', 'YES')360                                 })361            cur.execute('''362                SELECT {}, {} FROM {}.{}.{}363            '''.format(364                sql.identifier('ID'),365                sql.identifier('PAW_TOE_COUNT'),366                sql.identifier(CONFIG['snowflake_database']),367                sql.identifier(CONFIG['snowflake_schema']),368                sql.identifier('CATS')369            ))370            persisted_records = cur.fetchall()371            ## Assert that the split columns before/after new non-null data372            assert 2 * cat_count == len(persisted_records)373            assert cat_count == len([x for x in persisted_records if x[1] is None])374            assert cat_count == len([x for x in persisted_records if x[1] is not None])375def test_loading__column_type_change(db_prep):376    cat_count = 20377    main(CONFIG, input_stream=CatStream(cat_count))378    with connect(**TEST_DB) as conn:379        with conn.cursor() as cur:380            assert_columns_equal(cur,381                                 'CATS',382                                 {383                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),384                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),385                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),386                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),387                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),388                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),389                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),390                                     ('AGE', 'NUMBER', 'YES'),391                                     ('ID', 'NUMBER', 'NO'),392                                     ('NAME', 'TEXT', 'NO'),393                                     ('PAW_SIZE', 'NUMBER', 'NO'),394                                     ('PAW_COLOUR', 'TEXT', 'NO'),395                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),396                                     ('PATTERN', 'TEXT', 'YES')397                                 })398            cur.execute('''399                SELECT {} FROM {}.{}.{}400            '''.format(401                sql.identifier('NAME'),402                sql.identifier(CONFIG['snowflake_database']),403                sql.identifier(CONFIG['snowflake_schema']),404                sql.identifier('CATS')405            ))406            persisted_records = cur.fetchall()407            ## Assert that the original data is present408            assert cat_count == len(persisted_records)409            assert cat_count == len([x for x in persisted_records if x[0] is not None])410    class NameBooleanCatStream(CatStream):411        def generate_record(self):412            record = CatStream.generate_record(self)413            record['id'] = record['id'] + cat_count414            record['name'] = False415            return record416    stream = NameBooleanCatStream(cat_count)417    stream.schema = deepcopy(stream.schema)418    stream.schema['schema']['properties']['name'] = {'type': 'boolean'}419    main(CONFIG, input_stream=stream)420    with connect(**TEST_DB) as conn:421        with conn.cursor() as cur:422            assert_columns_equal(cur,423                                 'CATS',424                                 {425                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),426                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),427                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),428                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),429                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),430                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),431                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),432                                     ('AGE', 'NUMBER', 'YES'),433                                     ('ID', 'NUMBER', 'NO'),434                                     ('NAME__S', 'TEXT', 'YES'),435                                     ('NAME__B', 'BOOLEAN', 'YES'),436                                     ('PAW_SIZE', 'NUMBER', 'NO'),437                                     ('PAW_COLOUR', 'TEXT', 'NO'),438                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),439                                     ('PATTERN', 'TEXT', 'YES')440                                 })441            cur.execute('''442                SELECT {}, {} FROM {}.{}.{}443            '''.format(444                sql.identifier('NAME__S'),445                sql.identifier('NAME__B'),446                sql.identifier(CONFIG['snowflake_database']),447                sql.identifier(CONFIG['snowflake_schema']),448                sql.identifier('CATS')449            ))450            persisted_records = cur.fetchall()451            ## Assert that the split columns migrated data/persisted new data452            assert 2 * cat_count == len(persisted_records)453            assert cat_count == len([x for x in persisted_records if x[0] is not None])454            assert cat_count == len([x for x in persisted_records if x[1] is not None])455            assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])456    class NameIntegerCatStream(CatStream):457        def generate_record(self):458            record = CatStream.generate_record(self)459            record['id'] = record['id'] + (2 * cat_count)460            record['name'] = 314461            return record462    stream = NameIntegerCatStream(cat_count)463    stream.schema = deepcopy(stream.schema)464    stream.schema['schema']['properties']['name'] = {'type': 'integer'}465    main(CONFIG, input_stream=stream)466    with connect(**TEST_DB) as conn:467        with conn.cursor() as cur:468            assert_columns_equal(cur,469                                 'CATS',470                                 {471                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),472                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),473                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),474                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),475                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),476                                     ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),477                                     ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),478                                     ('AGE', 'NUMBER', 'YES'),479                                     ('ID', 'NUMBER', 'NO'),480                                     ('NAME__S', 'TEXT', 'YES'),481                                     ('NAME__B', 'BOOLEAN', 'YES'),482                                     ('NAME__I', 'NUMBER', 'YES'),483                                     ('PAW_SIZE', 'NUMBER', 'NO'),484                                     ('PAW_COLOUR', 'TEXT', 'NO'),485                                     ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),486                                     ('PATTERN', 'TEXT', 'YES')487                                 })488            cur.execute('''489                SELECT {}, {}, {} FROM {}.{}.{}490            '''.format(491                sql.identifier('NAME__S'),492                sql.identifier('NAME__B'),493                sql.identifier('NAME__I'),494                sql.identifier(CONFIG['snowflake_database']),495                sql.identifier(CONFIG['snowflake_schema']),496                sql.identifier('CATS')497            ))498            persisted_records = cur.fetchall()499            ## Assert that the split columns migrated data/persisted new data500            assert 3 * cat_count == len(persisted_records)501            assert cat_count == len([x for x in persisted_records if x[0] is not None])502            assert cat_count == len([x for x in persisted_records if x[1] is not None])503            assert cat_count == len([x for x in persisted_records if x[2] is not None])504            assert 0 == len(505                [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])506            assert 0 == len([x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None])507def test_loading__multi_types_columns(db_prep):508    stream_count = 50509    main(CONFIG, input_stream=MultiTypeStream(stream_count))510    with connect(**TEST_DB) as conn:511        with conn.cursor() as cur:512            assert_columns_equal(cur,513                                 'ROOT',514                                 {515                                     ('_SDC_PRIMARY_KEY', 'TEXT', 'NO'),516                                     ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),517                                     ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),518                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),519                                     ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),520                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),521                                     ('EVERY_TYPE__I', 'NUMBER', 'YES'),522                                     ('EVERY_TYPE__F', 'FLOAT', 'YES'),523                                     ('EVERY_TYPE__B', 'BOOLEAN', 'YES'),524                                     ('EVERY_TYPE__T', 'TIMESTAMP_TZ', 'YES'),525                                     ('EVERY_TYPE__I__1', 'NUMBER', 'YES'),526                                     ('EVERY_TYPE__F__1', 'FLOAT', 'YES'),527                                     ('EVERY_TYPE__B__1', 'BOOLEAN', 'YES'),528                                     ('NUMBER_WHICH_ONLY_COMES_AS_INTEGER', 'FLOAT', 'NO')529                                 })530            assert_columns_equal(cur,531                                 'ROOT__EVERY_TYPE',532                                 {533                                     ('_SDC_SOURCE_KEY__SDC_PRIMARY_KEY', 'TEXT', 'NO'),534                                     ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),535                                     ('_SDC_SEQUENCE', 'NUMBER', 'YES'),536                                     ('_SDC_VALUE', 'NUMBER', 'NO'),537                                     ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),538                                 })539            cur.execute('''540                SELECT {} FROM {}.{}.{}541            '''.format(542                sql.identifier('NUMBER_WHICH_ONLY_COMES_AS_INTEGER'),543                sql.identifier(CONFIG['snowflake_database']),544                sql.identifier(CONFIG['snowflake_schema']),545                sql.identifier('ROOT')546            ))547            persisted_records = cur.fetchall()548            ## Assert that the column is has migrated data549            assert stream_count == len(persisted_records)550            assert stream_count == len([x for x in persisted_records if isinstance(x[0], float)])551def test_upsert(db_prep):552    stream = CatStream(100)553    main(CONFIG, input_stream=stream)554    with connect(**TEST_DB) as conn:555        with conn.cursor() as cur:556            assert_count_equal(cur, 'CATS', 100)557        assert_records(conn, stream.records, 'CATS', 'ID')558    stream = CatStream(100)559    main(CONFIG, input_stream=stream)560    with connect(**TEST_DB) as conn:561        with conn.cursor() as cur:562            assert_count_equal(cur, 'CATS', 100)563        assert_records(conn, stream.records, 'CATS', 'ID')564    stream = CatStream(200)565    main(CONFIG, input_stream=stream)566    with connect(**TEST_DB) as conn:567        with conn.cursor() as cur:568            assert_count_equal(cur, 'CATS', 200)569        assert_records(conn, stream.records, 'CATS', 'ID')570def test_nested_delete_on_parent(db_prep):571    stream = CatStream(100, nested_count=3)572    main(CONFIG, input_stream=stream)573    with connect(**TEST_DB) as conn:574        with conn.cursor() as cur:575            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))576            high_nested = cur.fetchone()[0]577        assert_records(conn, stream.records, 'CATS', 'ID')578    stream = CatStream(100, nested_count=2)579    main(CONFIG, input_stream=stream)580    with connect(**TEST_DB) as conn:581        with conn.cursor() as cur:582            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))583            low_nested = cur.fetchone()[0]584        assert_records(conn, stream.records, 'CATS', 'ID')585    assert low_nested < high_nested586def test_full_table_replication(db_prep):587    stream = CatStream(110, version=0, nested_count=3)588    main(CONFIG, input_stream=stream)589    with connect(**TEST_DB) as conn:590        with conn.cursor() as cur:591            cur.execute(get_count_sql('CATS'))592            version_0_count = cur.fetchone()[0]593            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))594            version_0_sub_count = cur.fetchone()[0]595        assert_records(conn, stream.records, 'CATS', 'ID', match_pks=True)596    assert version_0_count == 110597    assert version_0_sub_count == 330598    stream = CatStream(100, version=1, nested_count=3)599    main(CONFIG, input_stream=stream)600    with connect(**TEST_DB) as conn:601        with conn.cursor() as cur:602            cur.execute(get_count_sql('CATS'))603            version_1_count = cur.fetchone()[0]604            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))605            version_1_sub_count = cur.fetchone()[0]606        assert_records(conn, stream.records, 'CATS', 'ID', match_pks=True)607    assert version_1_count == 100608    assert version_1_sub_count == 300609    stream = CatStream(120, version=2, nested_count=2)610    main(CONFIG, input_stream=stream)611    with connect(**TEST_DB) as conn:612        with conn.cursor() as cur:613            cur.execute(get_count_sql('CATS'))614            version_2_count = cur.fetchone()[0]615            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))616            version_2_sub_count = cur.fetchone()[0]617        assert_records(conn, stream.records, 'CATS', 'ID', match_pks=True)618    assert version_2_count == 120619    assert version_2_sub_count == 240620    ## Test that an outdated version cannot overwrite621    stream = CatStream(314, version=1, nested_count=2)622    main(CONFIG, input_stream=stream)623    with connect(**TEST_DB) as conn:624        with conn.cursor() as cur:625            cur.execute(get_count_sql('CATS'))626            older_version_count = cur.fetchone()[0]627    assert older_version_count == version_2_count628def test_deduplication_newer_rows(db_prep):629    stream = CatStream(100, nested_count=3, duplicates=2)630    main(CONFIG, input_stream=stream)631    with connect(**TEST_DB) as conn:632        with conn.cursor() as cur:633            cur.execute(get_count_sql('CATS'))634            table_count = cur.fetchone()[0]635            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))636            nested_table_count = cur.fetchone()[0]637            cur.execute('''638                SELECT "_SDC_SEQUENCE"639                FROM {}.{}.{}640                WHERE "ID" in ({})641            '''.format(642                sql.identifier(CONFIG['snowflake_database']),643                sql.identifier(CONFIG['snowflake_schema']),644                sql.identifier('CATS'),645                ','.join(["'{}'".format(x) for x in stream.duplicate_pks_used])646            ))647            dup_cat_records = cur.fetchall()648    assert stream.record_message_count == 102649    assert table_count == 100650    assert nested_table_count == 300651    for record in dup_cat_records:652        assert record[0] == stream.sequence + 200653def test_deduplication_older_rows(db_prep):654    stream = CatStream(100, nested_count=2, duplicates=2, duplicate_sequence_delta=-100)655    main(CONFIG, input_stream=stream)656    with connect(**TEST_DB) as conn:657        with conn.cursor() as cur:658            cur.execute(get_count_sql('CATS'))659            table_count = cur.fetchone()[0]660            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))661            nested_table_count = cur.fetchone()[0]662            663            cur.execute('''664                SELECT "_SDC_SEQUENCE"665                FROM {}.{}.{}666                WHERE "ID" in ({})667            '''.format(668                sql.identifier(CONFIG['snowflake_database']),669                sql.identifier(CONFIG['snowflake_schema']),670                sql.identifier('CATS'),671                ','.join(["'{}'".format(x) for x in stream.duplicate_pks_used])672            ))673            dup_cat_records = cur.fetchall()674    assert stream.record_message_count == 102675    assert table_count == 100676    assert nested_table_count == 200677    for record in dup_cat_records:678        assert record[0] == stream.sequence679def test_deduplication_existing_new_rows(db_prep):680    stream = CatStream(100, nested_count=2)681    main(CONFIG, input_stream=stream)682    original_sequence = stream.sequence683    stream = CatStream(100,684                       nested_count=2,685                       sequence=original_sequence - 20)686    main(CONFIG, input_stream=stream)687    with connect(**TEST_DB) as conn:688        with conn.cursor() as cur:689            cur.execute(get_count_sql('CATS'))690            table_count = cur.fetchone()[0]691            cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))692            nested_table_count = cur.fetchone()[0]693            cur.execute('''694                SELECT DISTINCT "_SDC_SEQUENCE"695                FROM {}.{}.{}696            '''.format(697                sql.identifier(CONFIG['snowflake_database']),698                sql.identifier(CONFIG['snowflake_schema']),699                sql.identifier('CATS')700            ))701            sequences = cur.fetchall()702    assert table_count == 100703    assert nested_table_count == 200704    assert len(sequences) == 1705    assert sequences[0][0] == original_sequencetest_target_redshift.py
Source:test_target_redshift.py  
...19    expected_column_tuples.add(20        ('_sdc_target_redshift_create_table_placeholder', 'boolean', 'YES')21    )22    assert set(columns) == expected_column_tuples23def get_count_sql(table_name):24    return sql.SQL(25        'SELECT count(*) FROM {}.{}'26    ).format(27        sql.Identifier(CONFIG['redshift_schema']),28        sql.Identifier(table_name))29def get_pk_key(pks, obj, subrecord=False):30    pk_parts = []31    for pk in pks:32        pk_parts.append(str(obj[pk]))33    if subrecord:34        for key, value in obj.items():35            if key[:11] == '_sdc_level_':36                pk_parts.append(str(value))37    return ':'.join(pk_parts)38def flatten_record(old_obj, subtables, subpks, new_obj=None, current_path=None, level=0):39    if not new_obj:40        new_obj = {}41    for prop, value in old_obj.items():42        if current_path:43            next_path = current_path + '__' + prop44        else:45            next_path = prop46        if isinstance(value, dict):47            flatten_record(value, subtables, subpks, new_obj=new_obj, current_path=next_path, level=level)48        elif isinstance(value, list):49            if next_path not in subtables:50                subtables[next_path] = []51            row_index = 052            for item in value:53                new_subobj = {}54                for key, value in subpks.items():55                    new_subobj[key] = value56                new_subpks = subpks.copy()57                new_subobj[singer_stream.SINGER_LEVEL.format(level)] = row_index58                new_subpks[singer_stream.SINGER_LEVEL.format(level)] = row_index59                subtables[next_path].append(flatten_record(item,60                                                           subtables,61                                                           new_subpks,62                                                           new_obj=new_subobj,63                                                           level=level + 1))64                row_index += 165        else:66            new_obj[next_path] = value67    return new_obj68def assert_record(a, b, subtables, subpks):69    a_flat = flatten_record(a, subtables, subpks)70    for prop, value in a_flat.items():71        if value is None:72            if prop in b:73                assert b[prop] == None74        elif isinstance(b[prop], datetime):75            assert value == b[prop].isoformat()[:19]76        else:77            assert value == b[prop]78def assert_records(conn, records, table_name, pks, match_pks=False):79    if not isinstance(pks, list):80        pks = [pks]81    with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:82        cur.execute("set timezone='UTC';")83        cur.execute(sql.SQL(84            'SELECT * FROM {}.{}'85        ).format(86            sql.Identifier(CONFIG['redshift_schema']),87            sql.Identifier(table_name)))88        persisted_records_raw = cur.fetchall()89        persisted_records = {}90        for persisted_record in persisted_records_raw:91            pk = get_pk_key(pks, persisted_record)92            persisted_records[pk] = persisted_record93        subtables = {}94        records_pks = []95        for record in records:96            pk = get_pk_key(pks, record)97            records_pks.append(pk)98            persisted_record = persisted_records[pk]99            subpks = {}100            for pk in pks:101                subpks[singer_stream.SINGER_SOURCE_PK_PREFIX + pk] = persisted_record[pk]102            assert_record(record, persisted_record, subtables, subpks)103        if match_pks:104            assert sorted(list(persisted_records.keys())) == sorted(records_pks)105        sub_pks = list(map(lambda pk: singer_stream.SINGER_SOURCE_PK_PREFIX + pk, pks))106        for subtable_name, items in subtables.items():107            cur.execute(sql.SQL(108                'SELECT * FROM {}.{}'109            ).format(110                sql.Identifier(CONFIG['redshift_schema']),111                sql.Identifier(table_name + '__' + subtable_name)))112            persisted_records_raw = cur.fetchall()113            persisted_records = {}114            for persisted_record in persisted_records_raw:115                pk = get_pk_key(sub_pks, persisted_record, subrecord=True)116                persisted_records[pk] = persisted_record117            subtables = {}118            records_pks = []119            for record in items:120                pk = get_pk_key(sub_pks, record, subrecord=True)121                records_pks.append(pk)122                persisted_record = persisted_records[pk]123                assert_record(record, persisted_record, subtables, subpks)124            assert len(subtables.values()) == 0125            if match_pks:126                assert sorted(list(persisted_records.keys())) == sorted(records_pks)127def test_loading__invalid__configuration__schema(db_prep):128    stream = CatStream(1)129    stream.schema = deepcopy(stream.schema)130    stream.schema['schema']['type'] = 'invalid type for a JSON Schema'131    with pytest.raises(TargetError, match=r'.*invalid JSON Schema instance.*'):132        main(CONFIG, input_stream=stream)133def test_loading__simple(db_prep):134    stream = CatStream(100)135    main(CONFIG, input_stream=stream)136    with psycopg2.connect(**TEST_DB) as conn:137        with conn.cursor() as cur:138            assert_columns_equal(cur,139                                 'cats',140                                 {141                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),142                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),143                                     ('_sdc_sequence', 'bigint', 'YES'),144                                     ('_sdc_table_version', 'bigint', 'YES'),145                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),146                                     ('adoption__was_foster', 'boolean', 'YES'),147                                     ('age', 'bigint', 'YES'),148                                     ('id', 'bigint', 'YES'),149                                     ('name', 'character varying', 'YES'),150                                     ('paw_size', 'bigint', 'YES'),151                                     ('paw_colour', 'character varying', 'YES'),152                                     ('flea_check_complete', 'boolean', 'YES'),153                                     ('pattern', 'character varying', 'YES')154                                 })155            assert_columns_equal(cur,156                                 'cats__adoption__immunizations',157                                 {158                                     ('_sdc_level_0_id', 'bigint', 'YES'),159                                     ('_sdc_sequence', 'bigint', 'YES'),160                                     ('_sdc_source_key_id', 'bigint', 'YES'),161                                     ('date_administered', 'timestamp with time zone', 'YES'),162                                     ('type', 'character varying', 'YES')163                                 })164            cur.execute(get_count_sql('cats'))165            assert cur.fetchone()[0] == 100166        for record in stream.records:167            record['paw_size'] = 314159168            record['paw_colour'] = ''169            record['flea_check_complete'] = False170        assert_records(conn, stream.records, 'cats', 'id')171def test_loading__nested_tables(db_prep):172    main(CONFIG, input_stream=NestedStream(10))173    with psycopg2.connect(**TEST_DB) as conn:174        with conn.cursor() as cur:175            cur.execute(get_count_sql('root'))176            assert 10 == cur.fetchone()[0]177            cur.execute(get_count_sql('root__array_scalar'))178            assert 50 == cur.fetchone()[0]179            cur.execute(180                get_count_sql('root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar'))181            assert 50 == cur.fetchone()[0]182            cur.execute(get_count_sql('root__array_of_array'))183            assert 20 == cur.fetchone()[0]184            cur.execute(get_count_sql('root__array_of_array___sdc_value'))185            assert 80 == cur.fetchone()[0]186            cur.execute(get_count_sql('root__array_of_array___sdc_value___sdc_value'))187            assert 200 == cur.fetchone()[0]188            assert_columns_equal(cur,189                                 'root',190                                 {191                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),192                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),193                                     ('_sdc_sequence', 'bigint', 'YES'),194                                     ('_sdc_table_version', 'bigint', 'YES'),195                                     ('id', 'bigint', 'YES'),196                                     ('null', 'bigint', 'YES'),197                                     ('nested_null__null', 'bigint', 'YES'),198                                     ('object_of_object_0__object_of_object_1__object_of_object_2__a', 'bigint', 'YES'),199                                     ('object_of_object_0__object_of_object_1__object_of_object_2__b', 'bigint', 'YES'),200                                     ('object_of_object_0__object_of_object_1__object_of_object_2__c', 'bigint', 'YES')201                                 })202            assert_columns_equal(cur,203                                 'root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar',204                                 {205                                     ('_sdc_sequence', 'bigint', 'YES'),206                                     ('_sdc_source_key_id', 'bigint', 'YES'),207                                     ('_sdc_level_0_id', 'bigint', 'YES'),208                                     ('_sdc_value', 'boolean', 'YES')209                                 })210            assert_columns_equal(cur,211                                 'root__array_of_array',212                                 {213                                     ('_sdc_sequence', 'bigint', 'YES'),214                                     ('_sdc_source_key_id', 'bigint', 'YES'),215                                     ('_sdc_level_0_id', 'bigint', 'YES')216                                 })217            assert_columns_equal(cur,218                                 'root__array_of_array___sdc_value',219                                 {220                                     ('_sdc_sequence', 'bigint', 'YES'),221                                     ('_sdc_source_key_id', 'bigint', 'YES'),222                                     ('_sdc_level_0_id', 'bigint', 'YES'),223                                     ('_sdc_level_1_id', 'bigint', 'YES')224                                 })225            assert_columns_equal(cur,226                                 'root__array_of_array___sdc_value___sdc_value',227                                 {228                                     ('_sdc_sequence', 'bigint', 'YES'),229                                     ('_sdc_source_key_id', 'bigint', 'YES'),230                                     ('_sdc_level_0_id', 'bigint', 'YES'),231                                     ('_sdc_level_1_id', 'bigint', 'YES'),232                                     ('_sdc_level_2_id', 'bigint', 'YES'),233                                     ('_sdc_value', 'bigint', 'YES')234                                 })235def test_loading__new_non_null_column(db_prep):236    cat_count = 50237    main(CONFIG, input_stream=CatStream(cat_count))238    class NonNullStream(CatStream):239        def generate_record(self):240            record = CatStream.generate_record(self)241            record['id'] = record['id'] + cat_count242            return record243    non_null_stream = NonNullStream(cat_count)244    non_null_stream.schema = deepcopy(non_null_stream.schema)245    non_null_stream.schema['schema']['properties']['paw_toe_count'] = {'type': 'integer',246                                                                       'default': 5}247    main(CONFIG, input_stream=non_null_stream)248    with psycopg2.connect(**TEST_DB) as conn:249        with conn.cursor() as cur:250            assert_columns_equal(cur,251                                 'cats',252                                 {253                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),254                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),255                                     ('_sdc_sequence', 'bigint', 'YES'),256                                     ('_sdc_table_version', 'bigint', 'YES'),257                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),258                                     ('adoption__was_foster', 'boolean', 'YES'),259                                     ('age', 'bigint', 'YES'),260                                     ('id', 'bigint', 'YES'),261                                     ('name', 'character varying', 'YES'),262                                     ('paw_size', 'bigint', 'YES'),263                                     ('paw_colour', 'character varying', 'YES'),264                                     ('paw_toe_count', 'bigint', 'YES'),265                                     ('flea_check_complete', 'boolean', 'YES'),266                                     ('pattern', 'character varying', 'YES')267                                 })268            cur.execute(sql.SQL('SELECT {}, {} FROM {}.{}').format(269                sql.Identifier('id'),270                sql.Identifier('paw_toe_count'),271                sql.Identifier(CONFIG['redshift_schema']),272                sql.Identifier('cats')273            ))274            persisted_records = cur.fetchall()275            ## Assert that the split columns before/after new non-null data276            assert 2 * cat_count == len(persisted_records)277            assert cat_count == len([x for x in persisted_records if x[1] is None])278            assert cat_count == len([x for x in persisted_records if x[1] is not None])279def test_loading__column_type_change(db_prep):280    cat_count = 20281    main(CONFIG, input_stream=CatStream(cat_count))282    with psycopg2.connect(**TEST_DB) as conn:283        with conn.cursor() as cur:284            assert_columns_equal(cur,285                                 'cats',286                                 {287                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),288                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),289                                     ('_sdc_sequence', 'bigint', 'YES'),290                                     ('_sdc_table_version', 'bigint', 'YES'),291                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),292                                     ('adoption__was_foster', 'boolean', 'YES'),293                                     ('age', 'bigint', 'YES'),294                                     ('id', 'bigint', 'YES'),295                                     ('name', 'character varying', 'YES'),296                                     ('paw_size', 'bigint', 'YES'),297                                     ('paw_colour', 'character varying', 'YES'),298                                     ('flea_check_complete', 'boolean', 'YES'),299                                     ('pattern', 'character varying', 'YES')300                                 })301            cur.execute(sql.SQL('SELECT {} FROM {}.{}').format(302                sql.Identifier('name'),303                sql.Identifier(CONFIG['redshift_schema']),304                sql.Identifier('cats')305            ))306            persisted_records = cur.fetchall()307            ## Assert that the original data is present308            assert cat_count == len(persisted_records)309            assert cat_count == len([x for x in persisted_records if x[0] is not None])310    class NameBooleanCatStream(CatStream):311        def generate_record(self):312            record = CatStream.generate_record(self)313            record['id'] = record['id'] + cat_count314            record['name'] = False315            return record316    stream = NameBooleanCatStream(cat_count)317    stream.schema = deepcopy(stream.schema)318    stream.schema['schema']['properties']['name'] = {'type': 'boolean'}319    main(CONFIG, input_stream=stream)320    with psycopg2.connect(**TEST_DB) as conn:321        with conn.cursor() as cur:322            assert_columns_equal(cur,323                                 'cats',324                                 {325                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),326                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),327                                     ('_sdc_sequence', 'bigint', 'YES'),328                                     ('_sdc_table_version', 'bigint', 'YES'),329                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),330                                     ('adoption__was_foster', 'boolean', 'YES'),331                                     ('age', 'bigint', 'YES'),332                                     ('id', 'bigint', 'YES'),333                                     ('name__s', 'character varying', 'YES'),334                                     ('name__b', 'boolean', 'YES'),335                                     ('paw_size', 'bigint', 'YES'),336                                     ('paw_colour', 'character varying', 'YES'),337                                     ('flea_check_complete', 'boolean', 'YES'),338                                     ('pattern', 'character varying', 'YES')339                                 })340            cur.execute(sql.SQL('SELECT {}, {} FROM {}.{}').format(341                sql.Identifier('name__s'),342                sql.Identifier('name__b'),343                sql.Identifier(CONFIG['redshift_schema']),344                sql.Identifier('cats')345            ))346            persisted_records = cur.fetchall()347            ## Assert that the split columns migrated data/persisted new data348            assert 2 * cat_count == len(persisted_records)349            assert cat_count == len([x for x in persisted_records if x[0] is not None])350            assert cat_count == len([x for x in persisted_records if x[1] is not None])351            assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])352    class NameIntegerCatStream(CatStream):353        def generate_record(self):354            record = CatStream.generate_record(self)355            record['id'] = record['id'] + (2 * cat_count)356            record['name'] = 314357            return record358    stream = NameIntegerCatStream(cat_count)359    stream.schema = deepcopy(stream.schema)360    stream.schema['schema']['properties']['name'] = {'type': 'integer'}361    main(CONFIG, input_stream=stream)362    with psycopg2.connect(**TEST_DB) as conn:363        with conn.cursor() as cur:364            assert_columns_equal(cur,365                                 'cats',366                                 {367                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),368                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),369                                     ('_sdc_sequence', 'bigint', 'YES'),370                                     ('_sdc_table_version', 'bigint', 'YES'),371                                     ('adoption__adopted_on', 'timestamp with time zone', 'YES'),372                                     ('adoption__was_foster', 'boolean', 'YES'),373                                     ('age', 'bigint', 'YES'),374                                     ('id', 'bigint', 'YES'),375                                     ('name__s', 'character varying', 'YES'),376                                     ('name__b', 'boolean', 'YES'),377                                     ('name__i', 'bigint', 'YES'),378                                     ('paw_size', 'bigint', 'YES'),379                                     ('paw_colour', 'character varying', 'YES'),380                                     ('flea_check_complete', 'boolean', 'YES'),381                                     ('pattern', 'character varying', 'YES')382                                 })383            cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}.{}').format(384                sql.Identifier('name__s'),385                sql.Identifier('name__b'),386                sql.Identifier('name__i'),387                sql.Identifier(CONFIG['redshift_schema']),388                sql.Identifier('cats')389            ))390            persisted_records = cur.fetchall()391            ## Assert that the split columns migrated data/persisted new data392            assert 3 * cat_count == len(persisted_records)393            assert cat_count == len([x for x in persisted_records if x[0] is not None])394            assert cat_count == len([x for x in persisted_records if x[1] is not None])395            assert cat_count == len([x for x in persisted_records if x[2] is not None])396            assert 0 == len(397                [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])398            assert 0 == len([x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None])399def test_loading__multi_types_columns(db_prep):400    stream_count = 50401    main(CONFIG, input_stream=MultiTypeStream(stream_count))402    with psycopg2.connect(**TEST_DB) as conn:403        with conn.cursor() as cur:404            assert_columns_equal(cur,405                                 'root',406                                 {407                                     ('_sdc_primary_key', 'character varying', 'YES'),408                                     ('_sdc_batched_at', 'timestamp with time zone', 'YES'),409                                     ('_sdc_received_at', 'timestamp with time zone', 'YES'),410                                     ('_sdc_sequence', 'bigint', 'YES'),411                                     ('_sdc_table_version', 'bigint', 'YES'),412                                     ('every_type__i', 'bigint', 'YES'),413                                     ('every_type__f', 'double precision', 'YES'),414                                     ('every_type__b', 'boolean', 'YES'),415                                     ('every_type__t', 'timestamp with time zone', 'YES'),416                                     ('every_type__i__1', 'bigint', 'YES'),417                                     ('every_type__f__1', 'double precision', 'YES'),418                                     ('every_type__b__1', 'boolean', 'YES'),419                                     ('number_which_only_comes_as_integer', 'double precision', 'YES')420                                 })421            assert_columns_equal(cur,422                                 'root__every_type',423                                 {424                                     ('_sdc_source_key__sdc_primary_key', 'character varying', 'YES'),425                                     ('_sdc_level_0_id', 'bigint', 'YES'),426                                     ('_sdc_sequence', 'bigint', 'YES'),427                                     ('_sdc_value', 'bigint', 'YES'),428                                 })429            cur.execute(sql.SQL('SELECT {} FROM {}.{}').format(430                sql.Identifier('number_which_only_comes_as_integer'),431                sql.Identifier(CONFIG['redshift_schema']),432                sql.Identifier('root')433            ))434            persisted_records = cur.fetchall()435            ## Assert that the column is has migrated data436            assert stream_count == len(persisted_records)437            assert stream_count == len([x for x in persisted_records if isinstance(x[0], float)])438def test_upsert(db_prep):439    stream = CatStream(100)440    main(CONFIG, input_stream=stream)441    with psycopg2.connect(**TEST_DB) as conn:442        with conn.cursor() as cur:443            cur.execute(get_count_sql('cats'))444            assert cur.fetchone()[0] == 100445        assert_records(conn, stream.records, 'cats', 'id')446    stream = CatStream(100)447    main(CONFIG, input_stream=stream)448    with psycopg2.connect(**TEST_DB) as conn:449        with conn.cursor() as cur:450            cur.execute(get_count_sql('cats'))451            assert cur.fetchone()[0] == 100452        assert_records(conn, stream.records, 'cats', 'id')453    stream = CatStream(200)454    main(CONFIG, input_stream=stream)455    with psycopg2.connect(**TEST_DB) as conn:456        with conn.cursor() as cur:457            cur.execute(get_count_sql('cats'))458            assert cur.fetchone()[0] == 200459        assert_records(conn, stream.records, 'cats', 'id')460def test_nested_delete_on_parent(db_prep):461    stream = CatStream(100, nested_count=3)462    main(CONFIG, input_stream=stream)463    with psycopg2.connect(**TEST_DB) as conn:464        with conn.cursor() as cur:465            cur.execute(get_count_sql('cats__adoption__immunizations'))466            high_nested = cur.fetchone()[0]467        assert_records(conn, stream.records, 'cats', 'id')468    stream = CatStream(100, nested_count=2)469    main(CONFIG, input_stream=stream)470    with psycopg2.connect(**TEST_DB) as conn:471        with conn.cursor() as cur:472            cur.execute(get_count_sql('cats__adoption__immunizations'))473            low_nested = cur.fetchone()[0]474        assert_records(conn, stream.records, 'cats', 'id')475    assert low_nested < high_nested476def test_full_table_replication(db_prep):477    stream = CatStream(110, version=0, nested_count=3)478    main(CONFIG, input_stream=stream)479    with psycopg2.connect(**TEST_DB) as conn:480        with conn.cursor() as cur:481            cur.execute(get_count_sql('cats'))482            version_0_count = cur.fetchone()[0]483            cur.execute(get_count_sql('cats__adoption__immunizations'))484            version_0_sub_count = cur.fetchone()[0]485        assert_records(conn, stream.records, 'cats', 'id', match_pks=True)486    assert version_0_count == 110487    assert version_0_sub_count == 330488    stream = CatStream(100, version=1, nested_count=3)489    main(CONFIG, input_stream=stream)490    with psycopg2.connect(**TEST_DB) as conn:491        with conn.cursor() as cur:492            cur.execute(get_count_sql('cats'))493            version_1_count = cur.fetchone()[0]494            cur.execute(get_count_sql('cats__adoption__immunizations'))495            version_1_sub_count = cur.fetchone()[0]496        assert_records(conn, stream.records, 'cats', 'id', match_pks=True)497    assert version_1_count == 100498    assert version_1_sub_count == 300499    stream = CatStream(120, version=2, nested_count=2)500    main(CONFIG, input_stream=stream)501    with psycopg2.connect(**TEST_DB) as conn:502        with conn.cursor() as cur:503            cur.execute(get_count_sql('cats'))504            version_2_count = cur.fetchone()[0]505            cur.execute(get_count_sql('cats__adoption__immunizations'))506            version_2_sub_count = cur.fetchone()[0]507        assert_records(conn, stream.records, 'cats', 'id', match_pks=True)508    assert version_2_count == 120509    assert version_2_sub_count == 240510    ## Test that an outdated version cannot overwrite511    stream = CatStream(314, version=1, nested_count=2)512    main(CONFIG, input_stream=stream)513    with psycopg2.connect(**TEST_DB) as conn:514        with conn.cursor() as cur:515            cur.execute(get_count_sql('cats'))516            older_version_count = cur.fetchone()[0]517    assert older_version_count == version_2_count518def test_deduplication_newer_rows(db_prep):519    stream = CatStream(100, nested_count=3, duplicates=2)520    main(CONFIG, input_stream=stream)521    with psycopg2.connect(**TEST_DB) as conn:522        with conn.cursor() as cur:523            cur.execute(get_count_sql('cats'))524            table_count = cur.fetchone()[0]525            cur.execute(get_count_sql('cats__adoption__immunizations'))526            nested_table_count = cur.fetchone()[0]527            cur.execute(sql.SQL(528                'SELECT _sdc_sequence FROM {}.{} WHERE id in '529                + '({})'.format(','.join(map(str, stream.duplicate_pks_used)))530            ).format(531                sql.Identifier(CONFIG['redshift_schema']),532                sql.Identifier('cats'),533                sql.Literal(','.join(map(str, stream.duplicate_pks_used)))))534            dup_cat_records = cur.fetchall()535    assert stream.record_message_count == 102536    assert table_count == 100537    assert nested_table_count == 300538    for record in dup_cat_records:539        assert record[0] == stream.sequence + 200540def test_deduplication_older_rows(db_prep):541    stream = CatStream(100, nested_count=2, duplicates=2, duplicate_sequence_delta=-100)542    main(CONFIG, input_stream=stream)543    with psycopg2.connect(**TEST_DB) as conn:544        with conn.cursor() as cur:545            cur.execute(get_count_sql('cats'))546            table_count = cur.fetchone()[0]547            cur.execute(get_count_sql('cats__adoption__immunizations'))548            nested_table_count = cur.fetchone()[0]549            cur.execute(sql.SQL(550                'SELECT _sdc_sequence FROM {}.{} WHERE id in '551                + '({})'.format(','.join(map(str, stream.duplicate_pks_used)))552            ).format(553                sql.Identifier(CONFIG['redshift_schema']),554                sql.Identifier('cats')))555            dup_cat_records = cur.fetchall()556    assert stream.record_message_count == 102557    assert table_count == 100558    assert nested_table_count == 200559    for record in dup_cat_records:560        assert record[0] == stream.sequence561def test_deduplication_existing_new_rows(db_prep):562    stream = CatStream(100, nested_count=2)563    main(CONFIG, input_stream=stream)564    original_sequence = stream.sequence565    stream = CatStream(100,566                       nested_count=2,567                       sequence=original_sequence - 20)568    main(CONFIG, input_stream=stream)569    with psycopg2.connect(**TEST_DB) as conn:570        with conn.cursor() as cur:571            cur.execute(get_count_sql('cats'))572            table_count = cur.fetchone()[0]573            cur.execute(get_count_sql('cats__adoption__immunizations'))574            nested_table_count = cur.fetchone()[0]575            cur.execute(sql.SQL(576                'SELECT DISTINCT _sdc_sequence FROM {}.{}'577            ).format(578                sql.Identifier(CONFIG['redshift_schema']),579                sql.Identifier('cats')))580            sequences = cur.fetchall()581    assert table_count == 100582    assert nested_table_count == 200583    assert len(sequences) == 1...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
