How to use get_count_sql method in autotest

Best Python code snippet using autotest_python

test_postgres.py

Source:test_postgres.py Github

copy

Full Screen

...19 table_name))20 columns = cursor.fetchall()21 assert (not columns and not expected_column_tuples) \22 or set(columns) == expected_column_tuples23def get_count_sql(table_name):24 return 'SELECT count(*) FROM "public"."{}"'.format(table_name)25def get_pk_key(pks, obj, subrecord=False):26 pk_parts = []27 for pk in pks:28 pk_parts.append(str(obj[pk]))29 if subrecord:30 for key, value in obj.items():31 if key[:11] == '_sdc_level_':32 pk_parts.append(str(value))33 return ':'.join(pk_parts)34def flatten_record(old_obj, subtables, subpks, new_obj=None, current_path=None, level=0):35 if not new_obj:36 new_obj = {}37 for prop, value in old_obj.items():38 if current_path:39 next_path = current_path + '__' + prop40 else:41 next_path = prop42 if isinstance(value, dict):43 flatten_record(value, subtables, subpks, new_obj=new_obj, current_path=next_path, level=level)44 elif isinstance(value, list):45 if next_path not in subtables:46 subtables[next_path] = []47 row_index = 048 for item in value:49 new_subobj = {}50 for key, value in subpks.items():51 new_subobj[key] = value52 new_subpks = subpks.copy()53 new_subobj[singer_stream.singer.LEVEL_FMT.format(level)] = row_index54 new_subpks[singer_stream.singer.LEVEL_FMT.format(level)] = row_index55 subtables[next_path].append(flatten_record(item,56 subtables,57 new_subpks,58 new_obj=new_subobj,59 level=level + 1))60 row_index += 161 else:62 new_obj[next_path] = value63 return new_obj64def assert_record(a, b, subtables, subpks):65 a_flat = flatten_record(a, subtables, subpks)66 for prop, value in a_flat.items():67 if value is None:68 if prop in b:69 assert b[prop] == None70 elif isinstance(b[prop], datetime):71 assert value == b[prop].isoformat()[:19]72 else:73 assert value == b[prop]74def assert_records(conn, records, table_name, pks, match_pks=False):75 if not isinstance(pks, list):76 pks = [pks]77 with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:78 cur.execute("set timezone='UTC';")79 cur.execute('SELECT * FROM {}'.format(table_name))80 persisted_records_raw = cur.fetchall()81 persisted_records = {}82 for persisted_record in persisted_records_raw:83 pk = get_pk_key(pks, persisted_record)84 persisted_records[pk] = persisted_record85 subtables = {}86 records_pks = []87 for record in records:88 pk = get_pk_key(pks, record)89 records_pks.append(pk)90 persisted_record = persisted_records[pk]91 subpks = {}92 for pk in pks:93 subpks[singer_stream.singer.SOURCE_PK_PREFIX + pk] = persisted_record[pk]94 assert_record(record, persisted_record, subtables, subpks)95 if match_pks:96 assert sorted(list(persisted_records.keys())) == sorted(records_pks)97 sub_pks = list(map(lambda pk: singer_stream.singer.SOURCE_PK_PREFIX + pk, pks))98 for subtable_name, items in subtables.items():99 cur.execute('SELECT * FROM {}'.format(100 table_name + '__' + subtable_name))101 persisted_records_raw = cur.fetchall()102 persisted_records = {}103 for persisted_record in persisted_records_raw:104 pk = get_pk_key(sub_pks, persisted_record, subrecord=True)105 persisted_records[pk] = persisted_record106 subtables = {}107 records_pks = []108 for record in items:109 pk = get_pk_key(sub_pks, record, subrecord=True)110 records_pks.append(pk)111 persisted_record = persisted_records[pk]112 assert_record(record, persisted_record, subtables, subpks)113 assert len(subtables.values()) == 0114 if match_pks:115 assert sorted(list(persisted_records.keys())) == sorted(records_pks)116def assert_column_indexed(conn, table_name, column_name):117 with conn.cursor() as cur:118 cur.execute('''119 SELECT t.relname AS table_name, i.relname AS index_name, a.attname AS column_name120 FROM pg_class t, pg_class i, pg_index ix, pg_attribute a121 WHERE t.oid = ix.indrelid AND i.oid = ix.indexrelid AND a.attrelid = t.oid122 AND a.attnum = ANY(ix.indkey) AND t.relkind = 'r'123 AND t.relname = '{table_name}' AND a.attname = '{column_name}'124 '''.format(125 table_name=table_name,126 column_name=column_name))127 assert len(cur.fetchall()) > 0128def test_loading__invalid__configuration__schema(db_cleanup):129 stream = CatStream(1)130 stream.schema = deepcopy(stream.schema)131 stream.schema['schema']['type'] = 'invalid type for a JSON Schema'132 with pytest.raises(TargetError, match=r'.*invalid JSON Schema instance.*'):133 main(CONFIG, input_stream=stream)134def test_loading__invalid__default_null_value__non_nullable_column(db_cleanup):135 class NullDefaultCatStream(CatStream):136 def generate_record(self):137 record = CatStream.generate_record(self)138 record['name'] = postgres.RESERVED_NULL_DEFAULT139 return record140 with pytest.raises(postgres.PostgresError, match=r'.*NotNullViolation.*'):141 main(CONFIG, input_stream=NullDefaultCatStream(20))142def test_loading__schema_version_0_gets_migrated_to_2(db_cleanup):143 main(CONFIG, input_stream=CatStream(100))144 with psycopg2.connect(**TEST_DB) as conn:145 with conn.cursor() as cur:146 target = postgres.PostgresTarget(conn)147 metadata = target._get_table_metadata(cur, 'cats')148 metadata.pop('schema_version')149 metadata.pop('path')150 metadata['table_mappings'] = [{'from': ['cats', 'adoption', 'immunizations'],151 'to': 'cats__adoption__immunizations'}]152 target._set_table_metadata(cur, 'cats', metadata)153 metadata = target._get_table_metadata(cur, 'cats__adoption__immunizations')154 metadata.pop('schema_version')155 metadata.pop('path')156 target._set_table_metadata(cur, 'cats__adoption__immunizations', metadata)157 main(CONFIG, input_stream=CatStream(100))158 with psycopg2.connect(**TEST_DB) as conn:159 with conn.cursor() as cur:160 target = postgres.PostgresTarget(conn)161 metadata = target._get_table_metadata(cur, 'cats')162 assert metadata['schema_version'] == 2163 assert metadata['path']164 assert not metadata.get('table_mappings')165 metadata = target._get_table_metadata(cur, 'cats__adoption__immunizations')166 assert metadata['schema_version'] == 2167 assert metadata['path']168 assert not metadata.get('table_mappings')169def test_loading__simple(db_cleanup):170 stream = CatStream(100)171 main(CONFIG, input_stream=stream)172 with psycopg2.connect(**TEST_DB) as conn:173 with conn.cursor() as cur:174 assert_columns_equal(cur,175 'cats',176 {177 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),178 ('_sdc_received_at', 'timestamp with time zone', 'YES'),179 ('_sdc_sequence', 'bigint', 'YES'),180 ('_sdc_table_version', 'bigint', 'YES'),181 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),182 ('adoption__was_foster', 'boolean', 'YES'),183 ('age', 'bigint', 'YES'),184 ('id', 'bigint', 'NO'),185 ('name', 'text', 'NO'),186 ('bio', 'text', 'NO'),187 ('paw_size', 'bigint', 'NO'),188 ('paw_colour', 'text', 'NO'),189 ('flea_check_complete', 'boolean', 'NO'),190 ('pattern', 'text', 'YES')191 })192 assert_columns_equal(cur,193 'cats__adoption__immunizations',194 {195 ('_sdc_level_0_id', 'bigint', 'NO'),196 ('_sdc_sequence', 'bigint', 'YES'),197 ('_sdc_source_key_id', 'bigint', 'NO'),198 ('date_administered', 'timestamp with time zone', 'YES'),199 ('type', 'text', 'YES')200 })201 cur.execute(get_count_sql('cats'))202 assert cur.fetchone()[0] == 100203 for record in stream.records:204 record['paw_size'] = 314159205 record['paw_colour'] = ''206 record['flea_check_complete'] = False207 assert_records(conn, stream.records, 'cats', 'id')208 assert_column_indexed(conn, 'cats', '_sdc_sequence')209 assert_column_indexed(conn, 'cats', 'id')210 assert_column_indexed(conn, 'cats__adoption__immunizations', '_sdc_sequence')211 assert_column_indexed(conn, 'cats__adoption__immunizations', '_sdc_level_0_id')212def test_loading__simple__allOf(db_cleanup):213 stream = CatStream(100)214 stream.schema = deepcopy(stream.schema)215 name = stream.schema['schema']['properties']['name']216 stream.schema['schema']['properties']['name'] = {217 'allOf': [218 name,219 {'maxLength': 100000000}220 ]221 }222 adoption = stream.schema['schema']['properties']['adoption']['properties']223 stream.schema['schema']['properties']['adoption'] = {224 'allOf': [225 {226 'type': ['object', 'null'],227 'properties': {228 'adopted_on': adoption['adopted_on'],229 'immunizations': adoption['immunizations'],230 }231 },232 {233 'properties': {234 'was_foster': adoption['was_foster']235 }236 }237 ]238 }239 main(CONFIG, input_stream=stream)240 with psycopg2.connect(**TEST_DB) as conn:241 with conn.cursor() as cur:242 assert_columns_equal(cur,243 'cats',244 {245 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),246 ('_sdc_received_at', 'timestamp with time zone', 'YES'),247 ('_sdc_sequence', 'bigint', 'YES'),248 ('_sdc_table_version', 'bigint', 'YES'),249 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),250 ('adoption__was_foster', 'boolean', 'YES'),251 ('age', 'bigint', 'YES'),252 ('id', 'bigint', 'NO'),253 ('name', 'text', 'NO'),254 ('bio', 'text', 'NO'),255 ('paw_size', 'bigint', 'NO'),256 ('paw_colour', 'text', 'NO'),257 ('flea_check_complete', 'boolean', 'NO'),258 ('pattern', 'text', 'YES')259 })260 assert_columns_equal(cur,261 'cats__adoption__immunizations',262 {263 ('_sdc_level_0_id', 'bigint', 'NO'),264 ('_sdc_sequence', 'bigint', 'YES'),265 ('_sdc_source_key_id', 'bigint', 'NO'),266 ('date_administered', 'timestamp with time zone', 'YES'),267 ('type', 'text', 'YES')268 })269 cur.execute(get_count_sql('cats'))270 assert cur.fetchone()[0] == 100271 for record in stream.records:272 record['paw_size'] = 314159273 record['paw_colour'] = ''274 record['flea_check_complete'] = False275 assert_records(conn, stream.records, 'cats', 'id')276def test_loading__simple__anyOf(db_cleanup):277 stream = CatStream(100)278 stream.schema = deepcopy(stream.schema)279 adoption_props = stream.schema['schema']['properties']['adoption']['properties']280 adoption_props['adopted_on'] = {281 "anyOf": [282 {283 "type": "string",284 "format": "date-time"285 },286 {"type": ["string", "null"]}]}287 main(CONFIG, input_stream=stream)288 with psycopg2.connect(**TEST_DB) as conn:289 with conn.cursor() as cur:290 assert_columns_equal(cur,291 'cats',292 {293 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),294 ('_sdc_received_at', 'timestamp with time zone', 'YES'),295 ('_sdc_sequence', 'bigint', 'YES'),296 ('_sdc_table_version', 'bigint', 'YES'),297 ('adoption__adopted_on__t', 'timestamp with time zone', 'YES'),298 ('adoption__adopted_on__s', 'text', 'YES'),299 ('adoption__was_foster', 'boolean', 'YES'),300 ('age', 'bigint', 'YES'),301 ('id', 'bigint', 'NO'),302 ('name', 'text', 'NO'),303 ('bio', 'text', 'NO'),304 ('paw_size', 'bigint', 'NO'),305 ('paw_colour', 'text', 'NO'),306 ('flea_check_complete', 'boolean', 'NO'),307 ('pattern', 'text', 'YES')308 })309 assert_columns_equal(cur,310 'cats__adoption__immunizations',311 {312 ('_sdc_level_0_id', 'bigint', 'NO'),313 ('_sdc_sequence', 'bigint', 'YES'),314 ('_sdc_source_key_id', 'bigint', 'NO'),315 ('date_administered', 'timestamp with time zone', 'YES'),316 ('type', 'text', 'YES')317 })318 cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}').format(319 sql.Identifier('adoption__adopted_on__t'),320 sql.Identifier('adoption__adopted_on__s'),321 sql.Identifier('adoption__was_foster'),322 sql.Identifier('cats')323 ))324 persisted_records = cur.fetchall()325 ## Assert that the split columns correctly persisted all datetime data326 assert 100 == len(persisted_records)327 assert 100 == len([x for x in persisted_records if x[1] is None])328 assert len([x for x in persisted_records if x[2] is not None]) \329 == len([x for x in persisted_records if x[0] is not None])330def test_loading__empty(db_cleanup):331 stream = CatStream(0)332 main(CONFIG, input_stream=stream)333 with psycopg2.connect(**TEST_DB) as conn:334 with conn.cursor() as cur:335 cur.execute(336 sql.SQL('''337 SELECT EXISTS(338 SELECT 1339 FROM information_schema.tables340 WHERE table_schema = {}341 AND table_name = {}342 );343 ''').format(344 sql.Literal('public'),345 sql.Literal('cats')))346 assert not cur.fetchone()[0]347def test_loading__empty__enabled_config(db_cleanup):348 config = CONFIG.copy()349 config['persist_empty_tables'] = True350 stream = CatStream(0)351 main(config, input_stream=stream)352 with psycopg2.connect(**TEST_DB) as conn:353 with conn.cursor() as cur:354 assert_columns_equal(cur,355 'cats',356 {357 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),358 ('_sdc_received_at', 'timestamp with time zone', 'YES'),359 ('_sdc_sequence', 'bigint', 'YES'),360 ('_sdc_table_version', 'bigint', 'YES'),361 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),362 ('adoption__was_foster', 'boolean', 'YES'),363 ('age', 'bigint', 'YES'),364 ('id', 'bigint', 'NO'),365 ('name', 'text', 'NO'),366 ('bio', 'text', 'NO'),367 ('paw_size', 'bigint', 'NO'),368 ('paw_colour', 'text', 'NO'),369 ('flea_check_complete', 'boolean', 'NO'),370 ('pattern', 'text', 'YES')371 })372 assert_columns_equal(cur,373 'cats__adoption__immunizations',374 {375 ('_sdc_level_0_id', 'bigint', 'NO'),376 ('_sdc_sequence', 'bigint', 'YES'),377 ('_sdc_source_key_id', 'bigint', 'NO'),378 ('date_administered', 'timestamp with time zone', 'YES'),379 ('type', 'text', 'YES')380 })381 cur.execute(get_count_sql('cats'))382 assert cur.fetchone()[0] == 0383def test_loading__empty__enabled_config_with_messages_for_only_one_stream(db_cleanup):384 config = CONFIG.copy()385 config['max_batch_rows'] = 20386 config['batch_detection_threshold'] = 1387 config['persist_empty_tables'] = True388 cat_rows = list(CatStream(100))389 dog_rows = list(DogStream(0))390 # Simulate one stream that yields a lot of records with another that yields no records, and ensure that only the first391 # needs to be flushed before any state messages are emitted392 def test_stream():393 yield cat_rows[0]394 yield dog_rows[0]395 for row in cat_rows[slice(1, 5)]:396 yield row397 yield json.dumps({'type': 'STATE', 'value': {'test': 'state-1'}})398 for row in cat_rows[slice(6, 25)]:399 yield row400 yield json.dumps({'type': 'STATE', 'value': {'test': 'state-2'}})401 main(config, input_stream=test_stream())402 with psycopg2.connect(**TEST_DB) as conn:403 with conn.cursor() as cur:404 assert_columns_equal(cur,405 'cats',406 {407 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),408 ('_sdc_received_at', 'timestamp with time zone', 'YES'),409 ('_sdc_sequence', 'bigint', 'YES'),410 ('_sdc_table_version', 'bigint', 'YES'),411 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),412 ('adoption__was_foster', 'boolean', 'YES'),413 ('age', 'bigint', 'YES'),414 ('id', 'bigint', 'NO'),415 ('name', 'text', 'NO'),416 ('bio', 'text', 'NO'),417 ('paw_size', 'bigint', 'NO'),418 ('paw_colour', 'text', 'NO'),419 ('flea_check_complete', 'boolean', 'NO'),420 ('pattern', 'text', 'YES')421 })422 assert_columns_equal(cur,423 'dogs',424 {425 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),426 ('_sdc_received_at', 'timestamp with time zone', 'YES'),427 ('_sdc_sequence', 'bigint', 'YES'),428 ('_sdc_table_version', 'bigint', 'YES'),429 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),430 ('adoption__was_foster', 'boolean', 'YES'),431 ('age', 'bigint', 'YES'),432 ('id', 'bigint', 'NO'),433 ('name', 'text', 'NO'),434 ('bio', 'text', 'NO'),435 ('paw_size', 'bigint', 'NO'),436 ('paw_colour', 'text', 'NO'),437 ('flea_check_complete', 'boolean', 'NO'),438 ('pattern', 'text', 'YES')439 })440 cur.execute(get_count_sql('cats'))441 assert cur.fetchone()[0] == 23442 cur.execute(get_count_sql('dogs'))443 assert cur.fetchone()[0] == 0444## TODO: Complex types defaulted445# def test_loading__default__complex_type(db_cleanup):446# main(CONFIG, input_stream=NestedStream(10))447#448# with psycopg2.connect(**TEST_DB) as conn:449# with conn.cursor() as cur:450# cur.execute(get_count_sql('root'))451# assert 10 == cur.fetchone()[0]452#453# cur.execute(get_count_sql('root__array_scalar_defaulted'))454# assert 100 == cur.fetchone()[0]455def test_loading__nested_tables(db_cleanup):456 main(CONFIG, input_stream=NestedStream(10))457 with psycopg2.connect(**TEST_DB) as conn:458 with conn.cursor() as cur:459 cur.execute(get_count_sql('root'))460 assert 10 == cur.fetchone()[0]461 cur.execute(get_count_sql('root__array_scalar'))462 assert 50 == cur.fetchone()[0]463 cur.execute(464 get_count_sql('root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar'[:63]))465 assert 50 == cur.fetchone()[0]466 cur.execute(get_count_sql('root__array_of_array'))467 assert 20 == cur.fetchone()[0]468 cur.execute(get_count_sql('root__array_of_array___sdc_value'))469 assert 80 == cur.fetchone()[0]470 cur.execute(get_count_sql('root__array_of_array___sdc_value___sdc_value'))471 assert 200 == cur.fetchone()[0]472 assert_columns_equal(cur,473 'root',474 {475 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),476 ('_sdc_received_at', 'timestamp with time zone', 'YES'),477 ('_sdc_sequence', 'bigint', 'YES'),478 ('_sdc_table_version', 'bigint', 'YES'),479 ('id', 'bigint', 'NO'),480 ('null', 'bigint', 'YES'),481 ('nested_null__null', 'bigint', 'YES'),482 ('object_of_object_0__object_of_object_1__object_of_object_2__a', 'bigint', 'NO'),483 ('object_of_object_0__object_of_object_1__object_of_object_2__b', 'bigint', 'NO'),484 ('object_of_object_0__object_of_object_1__object_of_object_2__c', 'bigint', 'NO')485 })486 assert_columns_equal(cur,487 'root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar'[:63],488 {489 ('_sdc_sequence', 'bigint', 'YES'),490 ('_sdc_source_key_id', 'bigint', 'NO'),491 ('_sdc_level_0_id', 'bigint', 'NO'),492 ('_sdc_value', 'boolean', 'NO')493 })494 assert_columns_equal(cur,495 'root__array_of_array',496 {497 ('_sdc_sequence', 'bigint', 'YES'),498 ('_sdc_source_key_id', 'bigint', 'NO'),499 ('_sdc_level_0_id', 'bigint', 'NO')500 })501 assert_columns_equal(cur,502 'root__array_of_array___sdc_value',503 {504 ('_sdc_sequence', 'bigint', 'YES'),505 ('_sdc_source_key_id', 'bigint', 'NO'),506 ('_sdc_level_0_id', 'bigint', 'NO'),507 ('_sdc_level_1_id', 'bigint', 'NO')508 })509 assert_columns_equal(cur,510 'root__array_of_array___sdc_value___sdc_value',511 {512 ('_sdc_sequence', 'bigint', 'YES'),513 ('_sdc_source_key_id', 'bigint', 'NO'),514 ('_sdc_level_0_id', 'bigint', 'NO'),515 ('_sdc_level_1_id', 'bigint', 'NO'),516 ('_sdc_level_2_id', 'bigint', 'NO'),517 ('_sdc_value', 'bigint', 'NO')518 })519def test_loading__new_non_null_column(db_cleanup):520 cat_count = 50521 main(CONFIG, input_stream=CatStream(cat_count))522 class NonNullStream(CatStream):523 def generate_record(self):524 record = CatStream.generate_record(self)525 record['id'] = record['id'] + cat_count526 return record527 non_null_stream = NonNullStream(cat_count)528 non_null_stream.schema = deepcopy(non_null_stream.schema)529 non_null_stream.schema['schema']['properties']['paw_toe_count'] = {'type': 'integer',530 'default': 5}531 main(CONFIG, input_stream=non_null_stream)532 with psycopg2.connect(**TEST_DB) as conn:533 with conn.cursor() as cur:534 assert_columns_equal(cur,535 'cats',536 {537 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),538 ('_sdc_received_at', 'timestamp with time zone', 'YES'),539 ('_sdc_sequence', 'bigint', 'YES'),540 ('_sdc_table_version', 'bigint', 'YES'),541 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),542 ('adoption__was_foster', 'boolean', 'YES'),543 ('age', 'bigint', 'YES'),544 ('id', 'bigint', 'NO'),545 ('name', 'text', 'NO'),546 ('bio', 'text', 'NO'),547 ('paw_size', 'bigint', 'NO'),548 ('paw_colour', 'text', 'NO'),549 ('paw_toe_count', 'bigint', 'YES'),550 ('flea_check_complete', 'boolean', 'NO'),551 ('pattern', 'text', 'YES')552 })553 cur.execute(sql.SQL('SELECT {}, {} FROM {}').format(554 sql.Identifier('id'),555 sql.Identifier('paw_toe_count'),556 sql.Identifier('cats')557 ))558 persisted_records = cur.fetchall()559 ## Assert that the split columns before/after new non-null data560 assert 2 * cat_count == len(persisted_records)561 assert cat_count == len([x for x in persisted_records if x[1] is None])562 assert cat_count == len([x for x in persisted_records if x[1] is not None])563def test_loading__column_type_change(db_cleanup):564 cat_count = 20565 main(CONFIG, input_stream=CatStream(cat_count))566 with psycopg2.connect(**TEST_DB) as conn:567 with conn.cursor() as cur:568 assert_columns_equal(cur,569 'cats',570 {571 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),572 ('_sdc_received_at', 'timestamp with time zone', 'YES'),573 ('_sdc_sequence', 'bigint', 'YES'),574 ('_sdc_table_version', 'bigint', 'YES'),575 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),576 ('adoption__was_foster', 'boolean', 'YES'),577 ('age', 'bigint', 'YES'),578 ('id', 'bigint', 'NO'),579 ('name', 'text', 'NO'),580 ('bio', 'text', 'NO'),581 ('paw_size', 'bigint', 'NO'),582 ('paw_colour', 'text', 'NO'),583 ('flea_check_complete', 'boolean', 'NO'),584 ('pattern', 'text', 'YES')585 })586 cur.execute(sql.SQL('SELECT {} FROM {}').format(587 sql.Identifier('name'),588 sql.Identifier('cats')589 ))590 persisted_records = cur.fetchall()591 ## Assert that the original data is present592 assert cat_count == len(persisted_records)593 assert cat_count == len([x for x in persisted_records if x[0] is not None])594 class NameDateTimeCatStream(CatStream):595 def generate_record(self):596 record = CatStream.generate_record(self)597 record['id'] = record['id'] + cat_count598 record['name'] = '2001-01-01 01:01:01.0001+01:01'599 return record600 stream = NameDateTimeCatStream(cat_count)601 stream.schema = deepcopy(stream.schema)602 stream.schema['schema']['properties']['name'] = {'type': 'string',603 'format': 'date-time'}604 main(CONFIG, input_stream=stream)605 with psycopg2.connect(**TEST_DB) as conn:606 with conn.cursor() as cur:607 assert_columns_equal(cur,608 'cats',609 {610 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),611 ('_sdc_received_at', 'timestamp with time zone', 'YES'),612 ('_sdc_sequence', 'bigint', 'YES'),613 ('_sdc_table_version', 'bigint', 'YES'),614 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),615 ('adoption__was_foster', 'boolean', 'YES'),616 ('age', 'bigint', 'YES'),617 ('id', 'bigint', 'NO'),618 ('name__s', 'text', 'YES'),619 ('name__t', 'timestamp with time zone', 'YES'),620 ('bio', 'text', 'NO'),621 ('paw_size', 'bigint', 'NO'),622 ('paw_colour', 'text', 'NO'),623 ('flea_check_complete', 'boolean', 'NO'),624 ('pattern', 'text', 'YES')625 })626 cur.execute(sql.SQL('SELECT {}, {} FROM {}').format(627 sql.Identifier('name__s'),628 sql.Identifier('name__t'),629 sql.Identifier('cats')630 ))631 persisted_records = cur.fetchall()632 ## Assert that the split columns migrated data/persisted new data633 assert 2 * cat_count == len(persisted_records)634 assert cat_count == len([x for x in persisted_records if x[0] is not None])635 assert cat_count == len([x for x in persisted_records if x[1] is not None])636 assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])637 class NameBooleanCatStream(CatStream):638 def generate_record(self):639 record = CatStream.generate_record(self)640 record['id'] = record['id'] + (2 * cat_count)641 record['name'] = False642 return record643 stream = NameBooleanCatStream(cat_count)644 stream.schema = deepcopy(stream.schema)645 stream.schema['schema']['properties']['name'] = {'type': 'boolean'}646 main(CONFIG, input_stream=stream)647 with psycopg2.connect(**TEST_DB) as conn:648 with conn.cursor() as cur:649 assert_columns_equal(cur,650 'cats',651 {652 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),653 ('_sdc_received_at', 'timestamp with time zone', 'YES'),654 ('_sdc_sequence', 'bigint', 'YES'),655 ('_sdc_table_version', 'bigint', 'YES'),656 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),657 ('adoption__was_foster', 'boolean', 'YES'),658 ('age', 'bigint', 'YES'),659 ('id', 'bigint', 'NO'),660 ('name__s', 'text', 'YES'),661 ('name__t', 'timestamp with time zone', 'YES'),662 ('name__b', 'boolean', 'YES'),663 ('bio', 'text', 'NO'),664 ('paw_size', 'bigint', 'NO'),665 ('paw_colour', 'text', 'NO'),666 ('flea_check_complete', 'boolean', 'NO'),667 ('pattern', 'text', 'YES')668 })669 cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}').format(670 sql.Identifier('name__s'),671 sql.Identifier('name__t'),672 sql.Identifier('name__b'),673 sql.Identifier('cats')674 ))675 persisted_records = cur.fetchall()676 ## Assert that the split columns migrated data/persisted new data677 assert 3 * cat_count == len(persisted_records)678 assert cat_count == len([x for x in persisted_records if x[0] is not None])679 assert cat_count == len([x for x in persisted_records if x[1] is not None])680 assert cat_count == len([x for x in persisted_records if x[2] is not None])681 assert 0 == len(682 [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])683 class NameIntegerCatStream(CatStream):684 def generate_record(self):685 record = CatStream.generate_record(self)686 record['id'] = record['id'] + (3 * cat_count)687 record['name'] = 314688 return record689 stream = NameIntegerCatStream(cat_count)690 stream.schema = deepcopy(stream.schema)691 stream.schema['schema']['properties']['name'] = {'type': 'integer'}692 main(CONFIG, input_stream=stream)693 with psycopg2.connect(**TEST_DB) as conn:694 with conn.cursor() as cur:695 assert_columns_equal(cur,696 'cats',697 {698 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),699 ('_sdc_received_at', 'timestamp with time zone', 'YES'),700 ('_sdc_sequence', 'bigint', 'YES'),701 ('_sdc_table_version', 'bigint', 'YES'),702 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),703 ('adoption__was_foster', 'boolean', 'YES'),704 ('age', 'bigint', 'YES'),705 ('id', 'bigint', 'NO'),706 ('name__s', 'text', 'YES'),707 ('name__t', 'timestamp with time zone', 'YES'),708 ('name__b', 'boolean', 'YES'),709 ('name__i', 'bigint', 'YES'),710 ('bio', 'text', 'NO'),711 ('paw_size', 'bigint', 'NO'),712 ('paw_colour', 'text', 'NO'),713 ('flea_check_complete', 'boolean', 'NO'),714 ('pattern', 'text', 'YES')715 })716 cur.execute(sql.SQL('SELECT {}, {}, {}, {} FROM {}').format(717 sql.Identifier('name__s'),718 sql.Identifier('name__t'),719 sql.Identifier('name__b'),720 sql.Identifier('name__i'),721 sql.Identifier('cats')722 ))723 persisted_records = cur.fetchall()724 ## Assert that the split columns migrated data/persisted new data725 assert 4 * cat_count == len(persisted_records)726 assert cat_count == len([x for x in persisted_records if x[0] is not None])727 assert cat_count == len([x for x in persisted_records if x[1] is not None])728 assert cat_count == len([x for x in persisted_records if x[2] is not None])729 assert cat_count == len([x for x in persisted_records if x[3] is not None])730 assert 0 == len(731 [x for x in persisted_records if732 x[0] is not None and x[1] is not None and x[2] is not None and x[3] is not None])733 assert 0 == len(734 [x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None and x[3] is None])735def test_loading__column_type_change__generative(db_cleanup):736 insert_count = 20737 repeats_to_perform = 5738 literal_types_remaining = set(['integer', 'number', 'boolean', 'string', 'date-time'])739 repeats_performed = 0740 while repeats_performed < repeats_to_perform or literal_types_remaining:741 stream = TypeChangeStream(insert_count, repeats_performed * insert_count)742 repeats_performed += 1743 if stream.changing_literal_type in literal_types_remaining:744 literal_types_remaining.remove(stream.changing_literal_type)745 main(CONFIG, input_stream=stream)746 with psycopg2.connect(**TEST_DB) as conn:747 with conn.cursor() as cur:748 assert_columns_equal(cur,749 'root',750 {751 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),752 ('_sdc_received_at', 'timestamp with time zone', 'YES'),753 ('_sdc_sequence', 'bigint', 'YES'),754 ('_sdc_table_version', 'bigint', 'YES'),755 ('id', 'bigint', 'NO'),756 ('changing_literal_type__s', 'text', 'YES'),757 ('changing_literal_type__t', 'timestamp with time zone', 'YES'),758 ('changing_literal_type__b', 'boolean', 'YES'),759 ('changing_literal_type__i', 'bigint', 'YES'),760 ('changing_literal_type__f', 'double precision', 'YES')761 })762 cur.execute(sql.SQL('SELECT {}, {}, {}, {}, {} FROM {}').format(763 sql.Identifier('changing_literal_type__s'),764 sql.Identifier('changing_literal_type__t'),765 sql.Identifier('changing_literal_type__b'),766 sql.Identifier('changing_literal_type__i'),767 sql.Identifier('changing_literal_type__f'),768 sql.Identifier('root')769 ))770 persisted_records = cur.fetchall()771 ## Assert that the split columns migrated data/persisted new data772 assert repeats_performed * insert_count == len(persisted_records)773 assert insert_count <= len([x for x in persisted_records if x[0] is not None])774 assert insert_count <= len([x for x in persisted_records if x[1] is not None])775 assert insert_count <= len([x for x in persisted_records if x[2] is not None])776 ## Integers are valid Numbers, so sometimes a Number can be placed into an existing Integer column777 assert (2 * insert_count) \778 <= len([x for x in persisted_records if x[3] is not None]) \779 + len([x for x in persisted_records if x[4] is not None])780 assert 0 == len(781 [x for x in persisted_records782 if x[0] is not None783 and x[1] is not None784 and x[2] is not None785 and x[3] is not None786 and x[4] is not None])787 assert 0 == len(788 [x for x in persisted_records789 if x[0] is None790 and x[1] is None791 and x[2] is None792 and x[3] is None793 and x[4] is None])794def test_loading__column_type_change__nullable(db_cleanup):795 cat_count = 20796 main(CONFIG, input_stream=CatStream(cat_count))797 with psycopg2.connect(**TEST_DB) as conn:798 with conn.cursor() as cur:799 assert_columns_equal(cur,800 'cats',801 {802 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),803 ('_sdc_received_at', 'timestamp with time zone', 'YES'),804 ('_sdc_sequence', 'bigint', 'YES'),805 ('_sdc_table_version', 'bigint', 'YES'),806 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),807 ('adoption__was_foster', 'boolean', 'YES'),808 ('age', 'bigint', 'YES'),809 ('id', 'bigint', 'NO'),810 ('name', 'text', 'NO'),811 ('bio', 'text', 'NO'),812 ('paw_size', 'bigint', 'NO'),813 ('paw_colour', 'text', 'NO'),814 ('flea_check_complete', 'boolean', 'NO'),815 ('pattern', 'text', 'YES')816 })817 cur.execute(sql.SQL('SELECT {} FROM {}').format(818 sql.Identifier('name'),819 sql.Identifier('cats')820 ))821 persisted_records = cur.fetchall()822 ## Assert that the original data is present823 assert cat_count == len(persisted_records)824 assert cat_count == len([x for x in persisted_records if x[0] is not None])825 class NameNullCatStream(CatStream):826 def generate_record(self):827 record = CatStream.generate_record(self)828 record['id'] = record['id'] + cat_count829 record['name'] = None830 return record831 stream = NameNullCatStream(cat_count)832 stream.schema = deepcopy(stream.schema)833 stream.schema['schema']['properties']['name'] = json_schema.make_nullable(834 stream.schema['schema']['properties']['name'])835 main(CONFIG, input_stream=stream)836 with psycopg2.connect(**TEST_DB) as conn:837 with conn.cursor() as cur:838 assert_columns_equal(cur,839 'cats',840 {841 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),842 ('_sdc_received_at', 'timestamp with time zone', 'YES'),843 ('_sdc_sequence', 'bigint', 'YES'),844 ('_sdc_table_version', 'bigint', 'YES'),845 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),846 ('adoption__was_foster', 'boolean', 'YES'),847 ('age', 'bigint', 'YES'),848 ('id', 'bigint', 'NO'),849 ('name', 'text', 'YES'),850 ('bio', 'text', 'NO'),851 ('paw_size', 'bigint', 'NO'),852 ('paw_colour', 'text', 'NO'),853 ('flea_check_complete', 'boolean', 'NO'),854 ('pattern', 'text', 'YES')855 })856 cur.execute(sql.SQL('SELECT {} FROM {}').format(857 sql.Identifier('name'),858 sql.Identifier('cats')859 ))860 persisted_records = cur.fetchall()861 ## Assert that the column is has migrated data862 assert 2 * cat_count == len(persisted_records)863 assert cat_count == len([x for x in persisted_records if x[0] is not None])864 assert cat_count == len([x for x in persisted_records if x[0] is None])865 class NameNonNullCatStream(CatStream):866 def generate_record(self):867 record = CatStream.generate_record(self)868 record['id'] = record['id'] + 2 * cat_count869 return record870 main(CONFIG, input_stream=NameNonNullCatStream(cat_count))871 with psycopg2.connect(**TEST_DB) as conn:872 with conn.cursor() as cur:873 assert_columns_equal(cur,874 'cats',875 {876 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),877 ('_sdc_received_at', 'timestamp with time zone', 'YES'),878 ('_sdc_sequence', 'bigint', 'YES'),879 ('_sdc_table_version', 'bigint', 'YES'),880 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),881 ('adoption__was_foster', 'boolean', 'YES'),882 ('age', 'bigint', 'YES'),883 ('id', 'bigint', 'NO'),884 ('name', 'text', 'YES'),885 ('bio', 'text', 'NO'),886 ('paw_size', 'bigint', 'NO'),887 ('paw_colour', 'text', 'NO'),888 ('flea_check_complete', 'boolean', 'NO'),889 ('pattern', 'text', 'YES')890 })891 cur.execute(sql.SQL('SELECT {} FROM {}').format(892 sql.Identifier('name'),893 sql.Identifier('cats')894 ))895 persisted_records = cur.fetchall()896 ## Assert that the column is has migrated data897 assert 3 * cat_count == len(persisted_records)898 assert 2 * cat_count == len([x for x in persisted_records if x[0] is not None])899 assert cat_count == len([x for x in persisted_records if x[0] is None])900def test_loading__column_type_change__nullable__missing_from_schema(db_cleanup):901 cat_count = 20902 main(CONFIG, input_stream=CatStream(cat_count))903 with psycopg2.connect(**TEST_DB) as conn:904 with conn.cursor() as cur:905 assert_columns_equal(cur,906 'cats',907 {908 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),909 ('_sdc_received_at', 'timestamp with time zone', 'YES'),910 ('_sdc_sequence', 'bigint', 'YES'),911 ('_sdc_table_version', 'bigint', 'YES'),912 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),913 ('adoption__was_foster', 'boolean', 'YES'),914 ('age', 'bigint', 'YES'),915 ('id', 'bigint', 'NO'),916 ('name', 'text', 'NO'),917 ('bio', 'text', 'NO'),918 ('paw_size', 'bigint', 'NO'),919 ('paw_colour', 'text', 'NO'),920 ('flea_check_complete', 'boolean', 'NO'),921 ('pattern', 'text', 'YES')922 })923 cur.execute(sql.SQL('SELECT {} FROM {}').format(924 sql.Identifier('name'),925 sql.Identifier('cats')926 ))927 persisted_records = cur.fetchall()928 ## Assert that the original data is present929 assert cat_count == len(persisted_records)930 assert cat_count == len([x for x in persisted_records if x[0] is not None])931 class NameMissingCatStream(CatStream):932 def generate_record(self):933 record = CatStream.generate_record(self)934 record['id'] = record['id'] + cat_count935 del record['name']936 return record937 stream = NameMissingCatStream(cat_count)938 stream.schema = deepcopy(stream.schema)939 del stream.schema['schema']['properties']['name']940 main(CONFIG, input_stream=stream)941 with psycopg2.connect(**TEST_DB) as conn:942 with conn.cursor() as cur:943 assert_columns_equal(cur,944 'cats',945 {946 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),947 ('_sdc_received_at', 'timestamp with time zone', 'YES'),948 ('_sdc_sequence', 'bigint', 'YES'),949 ('_sdc_table_version', 'bigint', 'YES'),950 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),951 ('adoption__was_foster', 'boolean', 'YES'),952 ('age', 'bigint', 'YES'),953 ('id', 'bigint', 'NO'),954 ('name', 'text', 'YES'),955 ('bio', 'text', 'NO'),956 ('paw_size', 'bigint', 'NO'),957 ('paw_colour', 'text', 'NO'),958 ('flea_check_complete', 'boolean', 'NO'),959 ('pattern', 'text', 'YES')960 })961 cur.execute(sql.SQL('SELECT {} FROM {}').format(962 sql.Identifier('name'),963 sql.Identifier('cats')964 ))965 persisted_records = cur.fetchall()966 ## Assert that the column is has migrated data967 assert 2 * cat_count == len(persisted_records)968 assert cat_count == len([x for x in persisted_records if x[0] is not None])969 assert cat_count == len([x for x in persisted_records if x[0] is None])970def test_loading__multi_types_columns(db_cleanup):971 stream_count = 50972 main(CONFIG, input_stream=MultiTypeStream(stream_count))973 with psycopg2.connect(**TEST_DB) as conn:974 with conn.cursor() as cur:975 assert_columns_equal(cur,976 'root',977 {978 ('_sdc_primary_key', 'text', 'NO'),979 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),980 ('_sdc_received_at', 'timestamp with time zone', 'YES'),981 ('_sdc_sequence', 'bigint', 'YES'),982 ('_sdc_table_version', 'bigint', 'YES'),983 ('every_type__i', 'bigint', 'YES'),984 ('every_type__f', 'double precision', 'YES'),985 ('every_type__b', 'boolean', 'YES'),986 ('every_type__t', 'timestamp with time zone', 'YES'),987 ('every_type__i__1', 'bigint', 'YES'),988 ('every_type__f__1', 'double precision', 'YES'),989 ('every_type__b__1', 'boolean', 'YES'),990 ('number_which_only_comes_as_integer', 'double precision', 'NO')991 })992 assert_columns_equal(cur,993 'root__every_type',994 {995 ('_sdc_source_key__sdc_primary_key', 'text', 'NO'),996 ('_sdc_level_0_id', 'bigint', 'NO'),997 ('_sdc_sequence', 'bigint', 'YES'),998 ('_sdc_value', 'bigint', 'NO'),999 })1000 cur.execute(sql.SQL('SELECT {} FROM {}').format(1001 sql.Identifier('number_which_only_comes_as_integer'),1002 sql.Identifier('root')1003 ))1004 persisted_records = cur.fetchall()1005 ## Assert that the column is has migrated data1006 assert stream_count == len(persisted_records)1007 assert stream_count == len([x for x in persisted_records if isinstance(x[0], float)])1008def test_loading__invalid__table_name__stream(db_cleanup):1009 def invalid_stream_named(stream_name):1010 stream = CatStream(100)1011 stream.stream = stream_name1012 stream.schema = deepcopy(stream.schema)1013 stream.schema['stream'] = stream_name1014 main(CONFIG, input_stream=stream)1015 invalid_stream_named('')1016 invalid_stream_named('x' * 1000)1017 invalid_stream_named('INVALID_name')1018 invalid_stream_named('a!!!invalid_name')1019 borderline_length_stream_name = 'x' * 611020 stream = CatStream(100, version=1)1021 stream.stream = borderline_length_stream_name1022 stream.schema = deepcopy(stream.schema)1023 stream.schema['stream'] = borderline_length_stream_name1024 main(CONFIG, input_stream=stream)1025 stream = CatStream(100, version=10)1026 stream.stream = borderline_length_stream_name1027 stream.schema = deepcopy(stream.schema)1028 stream.schema['stream'] = borderline_length_stream_name1029 main(CONFIG, input_stream=stream)1030def test_loading__table_name__stream__simple(db_cleanup):1031 def assert_tables_equal(cursor, expected_table_names):1032 cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")1033 tables = []1034 for table in cursor.fetchall():1035 tables.append(table[0])1036 assert (not tables and not expected_table_names) \1037 or set(tables) == expected_table_names1038 stream_name = "C@ts"1039 stream = CatStream(100)1040 stream.stream = stream_name1041 stream.schema = deepcopy(stream.schema)1042 stream.schema['stream'] = stream_name1043 main(CONFIG, input_stream=stream)1044 with psycopg2.connect(**TEST_DB) as conn:1045 with conn.cursor() as cur:1046 cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")1047 tables = []1048 for table in cur.fetchall():1049 tables.append(table[0])1050 assert {'c_ts', 'c_ts__adoption__immunizations'} == set(tables)1051 assert_columns_equal(cur,1052 'c_ts',1053 {1054 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1055 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1056 ('_sdc_sequence', 'bigint', 'YES'),1057 ('_sdc_table_version', 'bigint', 'YES'),1058 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1059 ('adoption__was_foster', 'boolean', 'YES'),1060 ('age', 'bigint', 'YES'),1061 ('id', 'bigint', 'NO'),1062 ('name', 'text', 'NO'),1063 ('bio', 'text', 'NO'),1064 ('paw_size', 'bigint', 'NO'),1065 ('paw_colour', 'text', 'NO'),1066 ('flea_check_complete', 'boolean', 'NO'),1067 ('pattern', 'text', 'YES')1068 })1069 assert_columns_equal(cur,1070 'c_ts__adoption__immunizations',1071 {1072 ('_sdc_level_0_id', 'bigint', 'NO'),1073 ('_sdc_sequence', 'bigint', 'YES'),1074 ('_sdc_source_key_id', 'bigint', 'NO'),1075 ('date_administered', 'timestamp with time zone', 'YES'),1076 ('type', 'text', 'YES')1077 })1078 cur.execute(get_count_sql('c_ts'))1079 assert cur.fetchone()[0] == 1001080 for record in stream.records:1081 record['paw_size'] = 3141591082 record['paw_colour'] = ''1083 record['flea_check_complete'] = False1084 assert_records(conn, stream.records, 'c_ts', 'id')1085def test_loading__invalid__table_name__nested(db_cleanup):1086 cat_count = 201087 sub_table_name = 'immunizations'1088 invalid_name = 'INValID!NON{conflicting'1089 class InvalidNameSubTableCatStream(CatStream):1090 immunizations_count = 01091 def generate_record(self):1092 record = CatStream.generate_record(self)1093 if record.get('adoption', False):1094 self.immunizations_count += len(record['adoption'][sub_table_name])1095 record['adoption'][invalid_name] = record['adoption'][sub_table_name]1096 return record1097 stream = InvalidNameSubTableCatStream(cat_count)1098 stream.schema = deepcopy(stream.schema)1099 stream.schema['schema']['properties']['adoption']['properties'][invalid_name] = \1100 stream.schema['schema']['properties']['adoption']['properties'][sub_table_name]1101 main(CONFIG, input_stream=stream)1102 immunizations_count = stream.immunizations_count1103 invalid_name_count = stream.immunizations_count1104 conflicting_name = sub_table_name.upper()1105 class ConflictingNameSubTableCatStream(CatStream):1106 immunizations_count = 01107 def generate_record(self):1108 record = CatStream.generate_record(self)1109 if record.get('adoption', False):1110 self.immunizations_count += len(record['adoption'][sub_table_name])1111 record['adoption'][conflicting_name] = record['adoption'][sub_table_name]1112 record['id'] = record['id'] + cat_count1113 return record1114 stream = ConflictingNameSubTableCatStream(cat_count)1115 stream.schema = deepcopy(stream.schema)1116 stream.schema['schema']['properties']['adoption']['properties'][conflicting_name] = \1117 stream.schema['schema']['properties']['adoption']['properties'][sub_table_name]1118 main(CONFIG, input_stream=stream)1119 immunizations_count += stream.immunizations_count1120 conflicting_name_count = stream.immunizations_count1121 with psycopg2.connect(**TEST_DB) as conn:1122 with conn.cursor() as cur:1123 assert_columns_equal(cur,1124 'cats',1125 {1126 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1127 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1128 ('_sdc_sequence', 'bigint', 'YES'),1129 ('_sdc_table_version', 'bigint', 'YES'),1130 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1131 ('adoption__was_foster', 'boolean', 'YES'),1132 ('age', 'bigint', 'YES'),1133 ('id', 'bigint', 'NO'),1134 ('name', 'text', 'NO'),1135 ('bio', 'text', 'NO'),1136 ('paw_size', 'bigint', 'NO'),1137 ('paw_colour', 'text', 'NO'),1138 ('flea_check_complete', 'boolean', 'NO'),1139 ('pattern', 'text', 'YES')1140 })1141 cur.execute(get_count_sql('cats'))1142 assert 2 * cat_count == cur.fetchone()[0]1143 cur.execute(get_count_sql('cats__adoption__immunizations'))1144 assert immunizations_count == cur.fetchone()[0]1145 cur.execute(get_count_sql('cats__adoption__invalid_non_conflicting'))1146 assert invalid_name_count == cur.fetchone()[0]1147 cur.execute(get_count_sql('cats__adoption__immunizations__1'))1148 assert conflicting_name_count == cur.fetchone()[0]1149def test_loading__invalid_column_name(db_cleanup):1150 non_alphanumeric_stream = CatStream(100)1151 non_alphanumeric_stream.schema = deepcopy(non_alphanumeric_stream.schema)1152 non_alphanumeric_stream.schema['schema']['properties']['!!!invalid_name'] = \1153 non_alphanumeric_stream.schema['schema']['properties']['age']1154 main(CONFIG, input_stream=non_alphanumeric_stream)1155 non_lowercase_stream = CatStream(100)1156 non_lowercase_stream.schema = deepcopy(non_lowercase_stream.schema)1157 non_lowercase_stream.schema['schema']['properties']['INVALID_name'] = \1158 non_lowercase_stream.schema['schema']['properties']['age']1159 main(CONFIG, input_stream=non_lowercase_stream)1160 duplicate_non_lowercase_stream_1 = CatStream(100)1161 duplicate_non_lowercase_stream_1.schema = deepcopy(duplicate_non_lowercase_stream_1.schema)1162 duplicate_non_lowercase_stream_1.schema['schema']['properties']['invalid!NAME'] = \1163 duplicate_non_lowercase_stream_1.schema['schema']['properties']['age']1164 main(CONFIG, input_stream=duplicate_non_lowercase_stream_1)1165 duplicate_non_lowercase_stream_2 = CatStream(100)1166 duplicate_non_lowercase_stream_2.schema = deepcopy(duplicate_non_lowercase_stream_2.schema)1167 duplicate_non_lowercase_stream_2.schema['schema']['properties']['invalid#NAME'] = \1168 duplicate_non_lowercase_stream_2.schema['schema']['properties']['age']1169 main(CONFIG, input_stream=duplicate_non_lowercase_stream_2)1170 duplicate_non_lowercase_stream_3 = CatStream(100)1171 duplicate_non_lowercase_stream_3.schema = deepcopy(duplicate_non_lowercase_stream_3.schema)1172 duplicate_non_lowercase_stream_3.schema['schema']['properties']['invalid%NAmE'] = \1173 duplicate_non_lowercase_stream_3.schema['schema']['properties']['age']1174 main(CONFIG, input_stream=duplicate_non_lowercase_stream_3)1175 name_too_long_stream = CatStream(100)1176 name_too_long_stream.schema = deepcopy(name_too_long_stream.schema)1177 name_too_long_stream.schema['schema']['properties']['x' * 1000] = \1178 name_too_long_stream.schema['schema']['properties']['age']1179 main(CONFIG, input_stream=name_too_long_stream)1180 duplicate_name_too_long_stream = CatStream(100)1181 duplicate_name_too_long_stream.schema = deepcopy(duplicate_name_too_long_stream.schema)1182 duplicate_name_too_long_stream.schema['schema']['properties']['x' * 100] = \1183 duplicate_name_too_long_stream.schema['schema']['properties']['age']1184 main(CONFIG, input_stream=duplicate_name_too_long_stream)1185 with psycopg2.connect(**TEST_DB) as conn:1186 with conn.cursor() as cur:1187 assert_columns_equal(cur,1188 'cats',1189 {1190 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1191 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1192 ('_sdc_sequence', 'bigint', 'YES'),1193 ('_sdc_table_version', 'bigint', 'YES'),1194 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1195 ('adoption__was_foster', 'boolean', 'YES'),1196 ('age', 'bigint', 'YES'),1197 ('id', 'bigint', 'NO'),1198 ('name', 'text', 'NO'),1199 ('bio', 'text', 'NO'),1200 ('paw_size', 'bigint', 'NO'),1201 ('paw_colour', 'text', 'NO'),1202 ('___invalid_name', 'bigint', 'YES'),1203 ('invalid_name', 'bigint', 'YES'),1204 ('invalid_name__1', 'bigint', 'YES'),1205 ('invalid_name__2', 'bigint', 'YES'),1206 ('invalid_name__3', 'bigint', 'YES'),1207 ('x' * 63, 'bigint', 'YES'),1208 (('x' * 60 + '__1'), 'bigint', 'YES'),1209 ('flea_check_complete', 'boolean', 'NO'),1210 ('pattern', 'text', 'YES')1211 })1212def test_loading__invalid_column_name__pk(db_cleanup):1213 def setup(count):1214 class Stream(CatStream):1215 def generate_record(self):1216 record = CatStream.generate_record(self)1217 record['ID'] = record['id']1218 record.pop('id')1219 return record1220 stream = Stream(count)1221 stream.schema = deepcopy(stream.schema)1222 stream.schema['schema']['properties']['ID'] = \1223 stream.schema['schema']['properties']['id']1224 stream.schema['key_properties'] = ['ID']1225 stream.schema['schema']['properties'].pop('id')1226 return stream1227 main(CONFIG, input_stream=setup(100))1228 main(CONFIG, input_stream=setup(200))1229 with psycopg2.connect(**TEST_DB) as conn:1230 with conn.cursor() as cur:1231 assert_columns_equal(cur,1232 'cats',1233 {1234 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1235 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1236 ('_sdc_sequence', 'bigint', 'YES'),1237 ('_sdc_table_version', 'bigint', 'YES'),1238 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1239 ('adoption__was_foster', 'boolean', 'YES'),1240 ('age', 'bigint', 'YES'),1241 ('id', 'bigint', 'NO'),1242 ('name', 'text', 'NO'),1243 ('bio', 'text', 'NO'),1244 ('paw_size', 'bigint', 'NO'),1245 ('paw_colour', 'text', 'NO'),1246 ('flea_check_complete', 'boolean', 'NO'),1247 ('pattern', 'text', 'YES')1248 })1249def test_loading__invalid_column_name__duplicate_name_handling(db_cleanup):1250 for i in range(101):1251 name_too_long_stream = CatStream(100)1252 name_too_long_stream.schema = deepcopy(name_too_long_stream.schema)1253 name_too_long_stream.schema['schema']['properties']['x' * (100 + i)] = \1254 name_too_long_stream.schema['schema']['properties']['age']1255 main(CONFIG, input_stream=name_too_long_stream)1256 expected_columns = {1257 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1258 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1259 ('_sdc_sequence', 'bigint', 'YES'),1260 ('_sdc_table_version', 'bigint', 'YES'),1261 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1262 ('adoption__was_foster', 'boolean', 'YES'),1263 ('age', 'bigint', 'YES'),1264 ('id', 'bigint', 'NO'),1265 ('name', 'text', 'NO'),1266 ('bio', 'text', 'NO'),1267 ('paw_size', 'bigint', 'NO'),1268 ('paw_colour', 'text', 'NO'),1269 ('x' * 63, 'bigint', 'YES'),1270 (('x' * 58 + '__100'), 'bigint', 'YES'),1271 ('flea_check_complete', 'boolean', 'NO'),1272 ('pattern', 'text', 'YES')1273 }1274 for i in range(1, 10):1275 expected_columns.add((('x' * 60 + '__' + str(i)), 'bigint', 'YES'))1276 for i in range(10, 100):1277 expected_columns.add((('x' * 59 + '__' + str(i)), 'bigint', 'YES'))1278 with psycopg2.connect(**TEST_DB) as conn:1279 with conn.cursor() as cur:1280 assert_columns_equal(cur, 'cats', expected_columns)1281def test_loading__invalid_column_name__column_type_change(db_cleanup):1282 invalid_column_name = 'INVALID!name'1283 cat_count = 201284 stream = CatStream(cat_count)1285 stream.schema = deepcopy(stream.schema)1286 stream.schema['schema']['properties'][invalid_column_name] = \1287 stream.schema['schema']['properties']['paw_colour']1288 main(CONFIG, input_stream=stream)1289 with psycopg2.connect(**TEST_DB) as conn:1290 with conn.cursor() as cur:1291 assert_columns_equal(cur,1292 'cats',1293 {1294 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1295 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1296 ('_sdc_sequence', 'bigint', 'YES'),1297 ('_sdc_table_version', 'bigint', 'YES'),1298 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1299 ('adoption__was_foster', 'boolean', 'YES'),1300 ('age', 'bigint', 'YES'),1301 ('id', 'bigint', 'NO'),1302 ('name', 'text', 'NO'),1303 ('bio', 'text', 'NO'),1304 ('paw_size', 'bigint', 'NO'),1305 ('paw_colour', 'text', 'NO'),1306 ('invalid_name', 'text', 'NO'),1307 ('flea_check_complete', 'boolean', 'NO'),1308 ('pattern', 'text', 'YES')1309 })1310 cur.execute(sql.SQL('SELECT {} FROM {}').format(1311 sql.Identifier('invalid_name'),1312 sql.Identifier('cats')1313 ))1314 persisted_records = cur.fetchall()1315 ## Assert that the original data is present1316 assert cat_count == len(persisted_records)1317 assert cat_count == len([x for x in persisted_records if x[0] is not None])1318 class BooleanCatStream(CatStream):1319 def generate_record(self):1320 record = CatStream.generate_record(self)1321 record['id'] = record['id'] + cat_count1322 record[invalid_column_name] = False1323 return record1324 stream = BooleanCatStream(cat_count)1325 stream.schema = deepcopy(stream.schema)1326 stream.schema['schema']['properties'][invalid_column_name] = {'type': 'boolean'}1327 main(CONFIG, input_stream=stream)1328 with psycopg2.connect(**TEST_DB) as conn:1329 with conn.cursor() as cur:1330 assert_columns_equal(cur,1331 'cats',1332 {1333 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1334 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1335 ('_sdc_sequence', 'bigint', 'YES'),1336 ('_sdc_table_version', 'bigint', 'YES'),1337 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1338 ('adoption__was_foster', 'boolean', 'YES'),1339 ('age', 'bigint', 'YES'),1340 ('id', 'bigint', 'NO'),1341 ('name', 'text', 'NO'),1342 ('bio', 'text', 'NO'),1343 ('paw_size', 'bigint', 'NO'),1344 ('paw_colour', 'text', 'NO'),1345 ('invalid_name__s', 'text', 'YES'),1346 ('invalid_name__b', 'boolean', 'YES'),1347 ('flea_check_complete', 'boolean', 'NO'),1348 ('pattern', 'text', 'YES')1349 })1350 cur.execute(sql.SQL('SELECT {}, {} FROM {}').format(1351 sql.Identifier('invalid_name__s'),1352 sql.Identifier('invalid_name__b'),1353 sql.Identifier('cats')1354 ))1355 persisted_records = cur.fetchall()1356 ## Assert that the split columns migrated data/persisted new data1357 assert 2 * cat_count == len(persisted_records)1358 assert cat_count == len([x for x in persisted_records if x[0] is not None])1359 assert cat_count == len([x for x in persisted_records if x[1] is not None])1360 assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])1361 class IntegerCatStream(CatStream):1362 def generate_record(self):1363 record = CatStream.generate_record(self)1364 record['id'] = record['id'] + (2 * cat_count)1365 record[invalid_column_name] = 3141366 return record1367 stream = IntegerCatStream(cat_count)1368 stream.schema = deepcopy(stream.schema)1369 stream.schema['schema']['properties'][invalid_column_name] = {'type': 'integer'}1370 main(CONFIG, input_stream=stream)1371 with psycopg2.connect(**TEST_DB) as conn:1372 with conn.cursor() as cur:1373 assert_columns_equal(cur,1374 'cats',1375 {1376 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1377 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1378 ('_sdc_sequence', 'bigint', 'YES'),1379 ('_sdc_table_version', 'bigint', 'YES'),1380 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1381 ('adoption__was_foster', 'boolean', 'YES'),1382 ('age', 'bigint', 'YES'),1383 ('id', 'bigint', 'NO'),1384 ('name', 'text', 'NO'),1385 ('bio', 'text', 'NO'),1386 ('paw_size', 'bigint', 'NO'),1387 ('paw_colour', 'text', 'NO'),1388 ('invalid_name__s', 'text', 'YES'),1389 ('invalid_name__b', 'boolean', 'YES'),1390 ('invalid_name__i', 'bigint', 'YES'),1391 ('flea_check_complete', 'boolean', 'NO'),1392 ('pattern', 'text', 'YES')1393 })1394 cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}').format(1395 sql.Identifier('invalid_name__s'),1396 sql.Identifier('invalid_name__b'),1397 sql.Identifier('invalid_name__i'),1398 sql.Identifier('cats')1399 ))1400 persisted_records = cur.fetchall()1401 ## Assert that the split columns migrated data/persisted new data1402 assert 3 * cat_count == len(persisted_records)1403 assert cat_count == len([x for x in persisted_records if x[0] is not None])1404 assert cat_count == len([x for x in persisted_records if x[1] is not None])1405 assert cat_count == len([x for x in persisted_records if x[2] is not None])1406 assert 0 == len(1407 [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])1408 assert 0 == len([x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None])1409def test_loading__column_type_change__pks__same_resulting_type(db_cleanup):1410 stream = CatStream(20)1411 stream.schema = deepcopy(stream.schema)1412 stream.schema['schema']['properties']['id'] = {'type': ['integer', 'null']}1413 main(CONFIG, input_stream=stream)1414 stream = CatStream(20)1415 stream.schema = deepcopy(stream.schema)1416 stream.schema['schema']['properties']['id'] = {'type': ['null', 'integer']}1417 main(CONFIG, input_stream=stream)1418def test_loading__invalid__column_type_change__pks(db_cleanup):1419 main(CONFIG, input_stream=CatStream(20))1420 class StringIdCatStream(CatStream):1421 def generate_record(self):1422 record = CatStream.generate_record(self)1423 record['id'] = str(record['id'])1424 return record1425 stream = StringIdCatStream(20)1426 stream.schema = deepcopy(stream.schema)1427 stream.schema['schema']['properties']['id'] = {'type': 'string'}1428 with pytest.raises(postgres.PostgresError, match=r'.*key_properties. type change detected'):1429 main(CONFIG, input_stream=stream)1430def test_loading__invalid__column_type_change__pks__nullable(db_cleanup):1431 main(CONFIG, input_stream=CatStream(20))1432 stream = CatStream(20)1433 stream.schema = deepcopy(stream.schema)1434 stream.schema['schema']['properties']['id'] = json_schema.make_nullable(stream.schema['schema']['properties']['id'])1435 with pytest.raises(postgres.PostgresError, match=r'.*key_properties. type change detected'):1436 main(CONFIG, input_stream=stream)1437def test_upsert(db_cleanup):1438 stream = CatStream(100)1439 main(CONFIG, input_stream=stream)1440 with psycopg2.connect(**TEST_DB) as conn:1441 with conn.cursor() as cur:1442 cur.execute(get_count_sql('cats'))1443 assert cur.fetchone()[0] == 1001444 assert_columns_equal(cur,1445 'cats',1446 {1447 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1448 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1449 ('_sdc_sequence', 'bigint', 'YES'),1450 ('_sdc_table_version', 'bigint', 'YES'),1451 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1452 ('adoption__was_foster', 'boolean', 'YES'),1453 ('age', 'bigint', 'YES'),1454 ('id', 'bigint', 'NO'),1455 ('name', 'text', 'NO'),1456 ('bio', 'text', 'NO'),1457 ('paw_size', 'bigint', 'NO'),1458 ('paw_colour', 'text', 'NO'),1459 ('flea_check_complete', 'boolean', 'NO'),1460 ('pattern', 'text', 'YES')1461 })1462 assert_columns_equal(cur,1463 'cats__adoption__immunizations',1464 {1465 ('_sdc_level_0_id', 'bigint', 'NO'),1466 ('_sdc_sequence', 'bigint', 'YES'),1467 ('_sdc_source_key_id', 'bigint', 'NO'),1468 ('date_administered', 'timestamp with time zone', 'YES'),1469 ('type', 'text', 'YES')1470 })1471 assert_records(conn, stream.records, 'cats', 'id')1472 stream = CatStream(100)1473 main(CONFIG, input_stream=stream)1474 with psycopg2.connect(**TEST_DB) as conn:1475 with conn.cursor() as cur:1476 cur.execute(get_count_sql('cats'))1477 assert cur.fetchone()[0] == 1001478 assert_columns_equal(cur,1479 'cats',1480 {1481 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1482 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1483 ('_sdc_sequence', 'bigint', 'YES'),1484 ('_sdc_table_version', 'bigint', 'YES'),1485 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1486 ('adoption__was_foster', 'boolean', 'YES'),1487 ('age', 'bigint', 'YES'),1488 ('id', 'bigint', 'NO'),1489 ('name', 'text', 'NO'),1490 ('bio', 'text', 'NO'),1491 ('paw_size', 'bigint', 'NO'),1492 ('paw_colour', 'text', 'NO'),1493 ('flea_check_complete', 'boolean', 'NO'),1494 ('pattern', 'text', 'YES')1495 })1496 assert_columns_equal(cur,1497 'cats__adoption__immunizations',1498 {1499 ('_sdc_level_0_id', 'bigint', 'NO'),1500 ('_sdc_sequence', 'bigint', 'YES'),1501 ('_sdc_source_key_id', 'bigint', 'NO'),1502 ('date_administered', 'timestamp with time zone', 'YES'),1503 ('type', 'text', 'YES')1504 })1505 assert_records(conn, stream.records, 'cats', 'id')1506 stream = CatStream(200)1507 main(CONFIG, input_stream=stream)1508 with psycopg2.connect(**TEST_DB) as conn:1509 with conn.cursor() as cur:1510 cur.execute(get_count_sql('cats'))1511 assert cur.fetchone()[0] == 2001512 assert_columns_equal(cur,1513 'cats',1514 {1515 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1516 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1517 ('_sdc_sequence', 'bigint', 'YES'),1518 ('_sdc_table_version', 'bigint', 'YES'),1519 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1520 ('adoption__was_foster', 'boolean', 'YES'),1521 ('age', 'bigint', 'YES'),1522 ('id', 'bigint', 'NO'),1523 ('name', 'text', 'NO'),1524 ('bio', 'text', 'NO'),1525 ('paw_size', 'bigint', 'NO'),1526 ('paw_colour', 'text', 'NO'),1527 ('flea_check_complete', 'boolean', 'NO'),1528 ('pattern', 'text', 'YES')1529 })1530 assert_columns_equal(cur,1531 'cats__adoption__immunizations',1532 {1533 ('_sdc_level_0_id', 'bigint', 'NO'),1534 ('_sdc_sequence', 'bigint', 'YES'),1535 ('_sdc_source_key_id', 'bigint', 'NO'),1536 ('date_administered', 'timestamp with time zone', 'YES'),1537 ('type', 'text', 'YES')1538 })1539 assert_records(conn, stream.records, 'cats', 'id')1540def test_upsert__invalid__primary_key_change(db_cleanup):1541 stream = CatStream(100)1542 main(CONFIG, input_stream=stream)1543 stream = CatStream(100)1544 schema = deepcopy(stream.schema)1545 schema['key_properties'].append('name')1546 stream.schema = schema1547 with pytest.raises(postgres.PostgresError, match=r'.*key_properties.*'):1548 main(CONFIG, input_stream=stream)1549def test_nested_delete_on_parent(db_cleanup):1550 stream = CatStream(100, nested_count=3)1551 main(CONFIG, input_stream=stream)1552 with psycopg2.connect(**TEST_DB) as conn:1553 with conn.cursor() as cur:1554 cur.execute(get_count_sql('cats__adoption__immunizations'))1555 high_nested = cur.fetchone()[0]1556 assert_records(conn, stream.records, 'cats', 'id')1557 stream = CatStream(100, nested_count=2)1558 main(CONFIG, input_stream=stream)1559 with psycopg2.connect(**TEST_DB) as conn:1560 with conn.cursor() as cur:1561 cur.execute(get_count_sql('cats__adoption__immunizations'))1562 low_nested = cur.fetchone()[0]1563 assert_records(conn, stream.records, 'cats', 'id')1564 assert low_nested < high_nested1565def test_full_table_replication(db_cleanup):1566 stream = CatStream(110, version=0, nested_count=3)1567 main(CONFIG, input_stream=stream)1568 with psycopg2.connect(**TEST_DB) as conn:1569 with conn.cursor() as cur:1570 cur.execute(get_count_sql('cats'))1571 version_0_count = cur.fetchone()[0]1572 cur.execute(get_count_sql('cats__adoption__immunizations'))1573 version_0_sub_count = cur.fetchone()[0]1574 assert_records(conn, stream.records, 'cats', 'id', match_pks=True)1575 assert version_0_count == 1101576 assert version_0_sub_count == 3301577 stream = CatStream(100, version=1, nested_count=3)1578 main(CONFIG, input_stream=stream)1579 with psycopg2.connect(**TEST_DB) as conn:1580 with conn.cursor() as cur:1581 cur.execute(get_count_sql('cats'))1582 version_1_count = cur.fetchone()[0]1583 cur.execute(get_count_sql('cats__adoption__immunizations'))1584 version_1_sub_count = cur.fetchone()[0]1585 assert_records(conn, stream.records, 'cats', 'id', match_pks=True)1586 assert version_1_count == 1001587 assert version_1_sub_count == 3001588 stream = CatStream(120, version=2, nested_count=2)1589 main(CONFIG, input_stream=stream)1590 with psycopg2.connect(**TEST_DB) as conn:1591 with conn.cursor() as cur:1592 cur.execute(get_count_sql('cats'))1593 version_2_count = cur.fetchone()[0]1594 cur.execute(get_count_sql('cats__adoption__immunizations'))1595 version_2_sub_count = cur.fetchone()[0]1596 assert_records(conn, stream.records, 'cats', 'id', match_pks=True)1597 assert version_2_count == 1201598 assert version_2_sub_count == 2401599 ## Test that an outdated version cannot overwrite1600 stream = CatStream(314, version=1, nested_count=2)1601 main(CONFIG, input_stream=stream)1602 with psycopg2.connect(**TEST_DB) as conn:1603 with conn.cursor() as cur:1604 cur.execute(get_count_sql('cats'))1605 older_version_count = cur.fetchone()[0]1606 assert older_version_count == version_2_count1607def test_deduplication_newer_rows(db_cleanup):1608 stream = CatStream(100, nested_count=3, duplicates=2)1609 main(CONFIG, input_stream=stream)1610 with psycopg2.connect(**TEST_DB) as conn:1611 with conn.cursor() as cur:1612 cur.execute(get_count_sql('cats'))1613 table_count = cur.fetchone()[0]1614 cur.execute(get_count_sql('cats__adoption__immunizations'))1615 nested_table_count = cur.fetchone()[0]1616 cur.execute('SELECT _sdc_sequence FROM cats WHERE id in ({})'.format(1617 ','.join(map(str, stream.duplicate_pks_used))))1618 dup_cat_records = cur.fetchall()1619 assert stream.record_message_count == 1021620 assert table_count == 1001621 assert nested_table_count == 3001622 for record in dup_cat_records:1623 assert record[0] == stream.sequence + 2001624def test_deduplication_older_rows(db_cleanup):1625 stream = CatStream(100, nested_count=2, duplicates=2, duplicate_sequence_delta=-100)1626 main(CONFIG, input_stream=stream)1627 with psycopg2.connect(**TEST_DB) as conn:1628 with conn.cursor() as cur:1629 cur.execute(get_count_sql('cats'))1630 table_count = cur.fetchone()[0]1631 cur.execute(get_count_sql('cats__adoption__immunizations'))1632 nested_table_count = cur.fetchone()[0]1633 cur.execute('SELECT _sdc_sequence FROM cats WHERE id in ({})'.format(1634 ','.join(map(str, stream.duplicate_pks_used))))1635 dup_cat_records = cur.fetchall()1636 assert stream.record_message_count == 1021637 assert table_count == 1001638 assert nested_table_count == 2001639 for record in dup_cat_records:1640 assert record[0] == stream.sequence1641def test_deduplication_existing_new_rows(db_cleanup):1642 stream = CatStream(100, nested_count=2)1643 main(CONFIG, input_stream=stream)1644 original_sequence = stream.sequence1645 stream = CatStream(100,1646 nested_count=2,1647 sequence=original_sequence - 20)1648 main(CONFIG, input_stream=stream)1649 with psycopg2.connect(**TEST_DB) as conn:1650 with conn.cursor() as cur:1651 cur.execute(get_count_sql('cats'))1652 table_count = cur.fetchone()[0]1653 cur.execute(get_count_sql('cats__adoption__immunizations'))1654 nested_table_count = cur.fetchone()[0]1655 cur.execute('SELECT DISTINCT _sdc_sequence FROM cats')1656 sequences = cur.fetchall()1657 assert table_count == 1001658 assert nested_table_count == 2001659 assert len(sequences) == 11660 assert sequences[0][0] == original_sequence1661def test_multiple_batches_upsert(db_cleanup):1662 config = CONFIG.copy()1663 config['max_batch_rows'] = 201664 config['batch_detection_threshold'] = 51665 stream = CatStream(100, nested_count=2)1666 main(config, input_stream=stream)1667 with psycopg2.connect(**TEST_DB) as conn:1668 with conn.cursor() as cur:1669 cur.execute(get_count_sql('cats'))1670 assert cur.fetchone()[0] == 1001671 cur.execute(get_count_sql('cats__adoption__immunizations'))1672 assert cur.fetchone()[0] == 2001673 assert_records(conn, stream.records, 'cats', 'id')1674 stream = CatStream(100, nested_count=3)1675 main(config, input_stream=stream)1676 with psycopg2.connect(**TEST_DB) as conn:1677 with conn.cursor() as cur:1678 cur.execute(get_count_sql('cats'))1679 assert cur.fetchone()[0] == 1001680 cur.execute(get_count_sql('cats__adoption__immunizations'))1681 assert cur.fetchone()[0] == 3001682 assert_records(conn, stream.records, 'cats', 'id')1683def test_multiple_batches_by_memory_upsert(db_cleanup):1684 config = CONFIG.copy()1685 config['max_batch_size'] = 10241686 config['batch_detection_threshold'] = 51687 stream = CatStream(100, nested_count=2)1688 main(config, input_stream=stream)1689 with psycopg2.connect(**TEST_DB) as conn:1690 with conn.cursor() as cur:1691 cur.execute(get_count_sql('cats'))1692 assert cur.fetchone()[0] == 1001693 cur.execute(get_count_sql('cats__adoption__immunizations'))1694 assert cur.fetchone()[0] == 2001695 assert_records(conn, stream.records, 'cats', 'id')1696 stream = CatStream(100, nested_count=3)1697 main(config, input_stream=stream)1698 with psycopg2.connect(**TEST_DB) as conn:1699 with conn.cursor() as cur:1700 cur.execute(get_count_sql('cats'))1701 assert cur.fetchone()[0] == 1001702 cur.execute(get_count_sql('cats__adoption__immunizations'))1703 assert cur.fetchone()[0] == 3001704 assert_records(conn, stream.records, 'cats', 'id')1705def test_loading__very_long_stream_name(db_cleanup):1706 stream_name = 'extremely_______________long_cats'1707 class LongCatStream(CatStream):1708 stream = stream_name1709 schema = CatStream.schema.copy()1710 LongCatStream.schema['stream'] = stream_name1711 stream = LongCatStream(100)1712 main(CONFIG, input_stream=stream)1713 with psycopg2.connect(**TEST_DB) as conn:1714 with conn.cursor() as cur:1715 assert_columns_equal(cur,1716 stream_name,1717 {1718 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),1719 ('_sdc_received_at', 'timestamp with time zone', 'YES'),1720 ('_sdc_sequence', 'bigint', 'YES'),1721 ('_sdc_table_version', 'bigint', 'YES'),1722 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),1723 ('adoption__was_foster', 'boolean', 'YES'),1724 ('age', 'bigint', 'YES'),1725 ('id', 'bigint', 'NO'),1726 ('name', 'text', 'NO'),1727 ('bio', 'text', 'NO'),1728 ('paw_size', 'bigint', 'NO'),1729 ('paw_colour', 'text', 'NO'),1730 ('flea_check_complete', 'boolean', 'NO'),1731 ('pattern', 'text', 'YES')1732 })1733 assert_columns_equal(cur,1734 '{}__adoption__immunizations'.format(stream_name),1735 {1736 ('_sdc_level_0_id', 'bigint', 'NO'),1737 ('_sdc_sequence', 'bigint', 'YES'),1738 ('_sdc_source_key_id', 'bigint', 'NO'),1739 ('date_administered', 'timestamp with time zone', 'YES'),1740 ('type', 'text', 'YES')1741 })1742 cur.execute(get_count_sql(stream_name))1743 assert cur.fetchone()[0] == 1001744 for record in stream.records:1745 record['paw_size'] = 3141591746 record['paw_colour'] = ''1747 record['flea_check_complete'] = False1748 assert_records(conn, stream.records, stream_name, 'id')1749 assert_column_indexed(conn, stream_name, '_sdc_sequence')1750 assert_column_indexed(conn, stream_name, 'id')1751 assert_column_indexed(conn, '{}__adoption__immunizations'.format(stream_name), '_sdc_sequence')1752 assert_column_indexed(conn, '{}__adoption__immunizations'.format(stream_name), '_sdc_level_0_id')1753def test_before_run_sql_is_executed_upon_construction(db_cleanup):1754 config = CONFIG.copy()1755 config['before_run_sql'] = 'CREATE TABLE before_sql_test ( code char(5) CONSTRAINT firstkey PRIMARY KEY );'1756 config['after_run_sql'] = 'CREATE TABLE after_sql_test ( code char(5) CONSTRAINT secondkey PRIMARY KEY );'...

Full Screen

Full Screen

test_target_snowflake.py

Source:test_target_snowflake.py Github

copy

Full Screen

...19 columns = []20 for column in cursor.fetchall():21 columns.append((column[0], column[1], column[2]))22 assert set(columns) == expected_column_tuples23def get_count_sql(table_name):24 return '''25 SELECT COUNT(*) FROM {}.{}.{}26 '''.format(27 sql.identifier(CONFIG['snowflake_database']),28 sql.identifier(CONFIG['snowflake_schema']),29 sql.identifier(table_name))30def assert_count_equal(cursor, table_name, expected_count):31 cursor.execute(get_count_sql(table_name))32 assert cursor.fetchone()[0] == expected_count33def get_pk_key(pks, obj, subrecord=False):34 pk_parts = []35 for pk in pks:36 pk_parts.append(str(obj[pk]))37 if subrecord:38 for key, value in obj.items():39 if key[:11] == '_SDC_LEVEL_' or key[:11] == '_sdc_level_':40 pk_parts.append(str(value))41 return ':'.join(pk_parts)42def flatten_record(old_obj, subtables, subpks, new_obj=None, current_path=None, level=0):43 if not new_obj:44 new_obj = {}45 for prop, value in old_obj.items():46 if current_path:47 next_path = current_path + '__' + prop48 else:49 next_path = prop50 if isinstance(value, dict):51 flatten_record(value, subtables, subpks, new_obj=new_obj, current_path=next_path, level=level)52 elif isinstance(value, list):53 if next_path not in subtables:54 subtables[next_path] = []55 row_index = 056 for item in value:57 new_subobj = {}58 for key, value in subpks.items():59 new_subobj[key.lower()] = value60 new_subpks = subpks.copy()61 new_subobj[singer_stream.SINGER_LEVEL.format(level)] = row_index62 new_subpks[singer_stream.SINGER_LEVEL.format(level)] = row_index63 subtables[next_path].append(flatten_record(item,64 subtables,65 new_subpks,66 new_obj=new_subobj,67 level=level + 1))68 row_index += 169 else:70 new_obj[next_path] = value71 return new_obj72def assert_record(a, b, subtables, subpks):73 a_flat = flatten_record(a, subtables, subpks)74 for prop, value in a_flat.items():75 canoncialized_prop = prop.upper()76 if value is None:77 if canoncialized_prop in b:78 assert b[canoncialized_prop] == None79 elif isinstance(b[canoncialized_prop], datetime):80 assert value == b[canoncialized_prop].isoformat()[:19]81 else:82 assert value == b[canoncialized_prop]83def assert_records(conn, records, table_name, pks, match_pks=False):84 if not isinstance(pks, list):85 pks = [pks]86 with conn.cursor(True) as cur:87 cur.execute("set timezone='UTC';")88 cur.execute('''89 SELECT * FROM {}.{}.{}90 '''.format(91 sql.identifier(CONFIG['snowflake_database']),92 sql.identifier(CONFIG['snowflake_schema']),93 sql.identifier(table_name)))94 persisted_records_raw = cur.fetchall()95 persisted_records = {}96 for persisted_record in persisted_records_raw:97 pk = get_pk_key(pks, persisted_record)98 persisted_records[pk] = persisted_record99 subtables = {}100 records_pks = []101 pre_canonicalized_pks = [x.lower() for x in pks]102 for record in records:103 pk = get_pk_key(pre_canonicalized_pks, record)104 records_pks.append(pk)105 persisted_record = persisted_records[pk.upper()]106 subpks = {}107 for pk in pks:108 subpks[singer_stream.SINGER_SOURCE_PK_PREFIX + pk] = persisted_record[pk]109 assert_record(record, persisted_record, subtables, subpks)110 if match_pks:111 assert sorted(list(persisted_records.keys())) == sorted(records_pks)112 sub_pks = list(map(lambda pk: singer_stream.SINGER_SOURCE_PK_PREFIX.upper() + pk, pks))113 for subtable_name, items in subtables.items():114 cur.execute('''115 SELECT * FROM {}.{}.{}116 '''.format(117 sql.identifier(CONFIG['snowflake_database']),118 sql.identifier(CONFIG['snowflake_schema']),119 sql.identifier(table_name + '__' + subtable_name.upper())))120 persisted_records_raw = cur.fetchall()121 persisted_records = {}122 for persisted_record in persisted_records_raw:123 pk = get_pk_key(sub_pks, persisted_record, subrecord=True)124 persisted_records[pk] = persisted_record125 subtables = {}126 records_pks = []127 pre_canonicalized_sub_pks = [x.lower() for x in sub_pks]128 for record in items:129 pk = get_pk_key(pre_canonicalized_sub_pks, record, subrecord=True)130 records_pks.append(pk)131 persisted_record = persisted_records[pk]132 assert_record(record, persisted_record, subtables, subpks)133 assert len(subtables.values()) == 0134 if match_pks:135 assert sorted(list(persisted_records.keys())) == sorted(records_pks)136def test_connect(db_prep):137 with connect(**TEST_DB) as connection:138 with connection.cursor() as cur:139 assert cur.execute('select 1').fetchall()140def test_loading__empty(db_prep):141 stream = CatStream(0)142def test_loading__empty__enabled_config(db_prep):143 config = CONFIG.copy()144 config['persist_empty_tables'] = True145 stream = CatStream(0)146 main(config, input_stream=stream)147 with connect(**TEST_DB) as conn:148 with conn.cursor() as cur:149 assert_columns_equal(cur,150 'CATS',151 {152 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),153 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),154 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),155 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),156 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),157 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),158 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),159 ('AGE', 'NUMBER', 'YES'),160 ('ID', 'NUMBER', 'NO'),161 ('NAME', 'TEXT', 'NO'),162 ('PAW_SIZE', 'NUMBER', 'NO'),163 ('PAW_COLOUR', 'TEXT', 'NO'),164 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),165 ('PATTERN', 'TEXT', 'YES')166 })167 assert_columns_equal(cur,168 'CATS__ADOPTION__IMMUNIZATIONS',169 {170 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),171 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),172 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),173 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),174 ('DATE_ADMINISTERED', 'TIMESTAMP_TZ', 'YES'),175 ('TYPE', 'TEXT', 'YES')176 })177 assert_count_equal(cur, 'CATS', 0)178def test_loading__empty__enabled_config__repeatability(db_prep):179 config = CONFIG.copy()180 config['persist_empty_tables'] = True181 main(config, input_stream=CatStream(0))182 main(config, input_stream=CatStream(0))183 main(config, input_stream=CatStream(0))184def test_loading__simple(db_prep):185 stream = CatStream(100)186 main(CONFIG, input_stream=stream)187 with connect(**TEST_DB) as conn:188 with conn.cursor() as cur:189 assert_columns_equal(cur,190 'CATS',191 {192 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),193 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),194 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),195 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),196 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),197 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),198 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),199 ('AGE', 'NUMBER', 'YES'),200 ('ID', 'NUMBER', 'NO'),201 ('NAME', 'TEXT', 'NO'),202 ('PAW_SIZE', 'NUMBER', 'NO'),203 ('PAW_COLOUR', 'TEXT', 'NO'),204 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),205 ('PATTERN', 'TEXT', 'YES')206 })207 assert_columns_equal(cur,208 'CATS__ADOPTION__IMMUNIZATIONS',209 {210 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),211 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),212 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),213 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),214 ('DATE_ADMINISTERED', 'TIMESTAMP_TZ', 'YES'),215 ('TYPE', 'TEXT', 'YES')216 })217 assert_count_equal(cur, 'CATS', 100)218 for record in stream.records:219 record['paw_size'] = 314159220 record['paw_colour'] = ''221 record['flea_check_complete'] = False222 assert_records(conn, stream.records, 'CATS', 'ID')223def test_loading__simple__s3_staging(db_prep):224 stream = CatStream(100)225 main(S3_CONFIG, input_stream=stream)226 with connect(**TEST_DB) as conn:227 with conn.cursor() as cur:228 assert_columns_equal(cur,229 'CATS',230 {231 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),232 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),233 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),234 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),235 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),236 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),237 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),238 ('AGE', 'NUMBER', 'YES'),239 ('ID', 'NUMBER', 'NO'),240 ('NAME', 'TEXT', 'NO'),241 ('PAW_SIZE', 'NUMBER', 'NO'),242 ('PAW_COLOUR', 'TEXT', 'NO'),243 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),244 ('PATTERN', 'TEXT', 'YES')245 })246 assert_columns_equal(cur,247 'CATS__ADOPTION__IMMUNIZATIONS',248 {249 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),250 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),251 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),252 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),253 ('DATE_ADMINISTERED', 'TIMESTAMP_TZ', 'YES'),254 ('TYPE', 'TEXT', 'YES')255 })256 assert_count_equal(cur, 'CATS', 100)257 for record in stream.records:258 record['paw_size'] = 314159259 record['paw_colour'] = ''260 record['flea_check_complete'] = False261 assert_records(conn, stream.records, 'CATS', 'ID')262def test_loading__nested_tables(db_prep):263 main(CONFIG, input_stream=NestedStream(10))264 with connect(**TEST_DB) as conn:265 with conn.cursor() as cur:266 assert_count_equal(cur, 'ROOT', 10)267 assert_count_equal(cur, 'ROOT__ARRAY_SCALAR', 50)268 assert_count_equal(269 cur,270 'ROOT__OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__ARRAY_SCALAR',271 50)272 assert_count_equal(cur, 'ROOT__ARRAY_OF_ARRAY', 20)273 assert_count_equal(cur, 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE', 80)274 assert_count_equal(cur, 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE___SDC_VALUE', 200)275 assert_columns_equal(cur,276 'ROOT',277 {278 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),279 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),280 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),281 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),282 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),283 ('ID', 'NUMBER', 'NO'),284 ('NULL', 'NUMBER', 'YES'),285 ('NESTED_NULL__NULL', 'NUMBER', 'YES'),286 ('OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__A', 'NUMBER', 'NO'),287 ('OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__B', 'NUMBER', 'NO'),288 ('OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__C', 'NUMBER', 'NO')289 })290 assert_columns_equal(cur,291 'ROOT__OBJECT_OF_OBJECT_0__OBJECT_OF_OBJECT_1__OBJECT_OF_OBJECT_2__ARRAY_SCALAR',292 {293 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),294 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),295 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),296 ('_SDC_VALUE', 'BOOLEAN', 'NO'),297 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),298 })299 assert_columns_equal(cur,300 'ROOT__ARRAY_OF_ARRAY',301 {302 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),303 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),304 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),305 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),306 })307 assert_columns_equal(cur,308 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE',309 {310 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),311 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),312 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),313 ('_SDC_LEVEL_1_ID', 'NUMBER', 'NO'),314 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),315 })316 assert_columns_equal(cur,317 'ROOT__ARRAY_OF_ARRAY___SDC_VALUE___SDC_VALUE',318 {319 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),320 ('_SDC_SOURCE_KEY_ID', 'NUMBER', 'NO'),321 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),322 ('_SDC_LEVEL_1_ID', 'NUMBER', 'NO'),323 ('_SDC_LEVEL_2_ID', 'NUMBER', 'NO'),324 ('_SDC_VALUE', 'NUMBER', 'NO'),325 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),326 })327def test_loading__new_non_null_column(db_prep):328 cat_count = 50329 main(CONFIG, input_stream=CatStream(cat_count))330 class NonNullStream(CatStream):331 def generate_record(self):332 record = CatStream.generate_record(self)333 record['id'] = record['id'] + cat_count334 return record335 non_null_stream = NonNullStream(cat_count)336 non_null_stream.schema = deepcopy(non_null_stream.schema)337 non_null_stream.schema['schema']['properties']['paw_toe_count'] = {'type': 'integer',338 'default': 5}339 main(CONFIG, input_stream=non_null_stream)340 with connect(**TEST_DB) as conn:341 with conn.cursor() as cur:342 assert_columns_equal(cur,343 'CATS',344 {345 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),346 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),347 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),348 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),349 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),350 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),351 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),352 ('AGE', 'NUMBER', 'YES'),353 ('ID', 'NUMBER', 'NO'),354 ('NAME', 'TEXT', 'NO'),355 ('PAW_SIZE', 'NUMBER', 'NO'),356 ('PAW_COLOUR', 'TEXT', 'NO'),357 ('PAW_TOE_COUNT', 'NUMBER', 'YES'),358 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),359 ('PATTERN', 'TEXT', 'YES')360 })361 cur.execute('''362 SELECT {}, {} FROM {}.{}.{}363 '''.format(364 sql.identifier('ID'),365 sql.identifier('PAW_TOE_COUNT'),366 sql.identifier(CONFIG['snowflake_database']),367 sql.identifier(CONFIG['snowflake_schema']),368 sql.identifier('CATS')369 ))370 persisted_records = cur.fetchall()371 ## Assert that the split columns before/after new non-null data372 assert 2 * cat_count == len(persisted_records)373 assert cat_count == len([x for x in persisted_records if x[1] is None])374 assert cat_count == len([x for x in persisted_records if x[1] is not None])375def test_loading__column_type_change(db_prep):376 cat_count = 20377 main(CONFIG, input_stream=CatStream(cat_count))378 with connect(**TEST_DB) as conn:379 with conn.cursor() as cur:380 assert_columns_equal(cur,381 'CATS',382 {383 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),384 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),385 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),386 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),387 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),388 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),389 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),390 ('AGE', 'NUMBER', 'YES'),391 ('ID', 'NUMBER', 'NO'),392 ('NAME', 'TEXT', 'NO'),393 ('PAW_SIZE', 'NUMBER', 'NO'),394 ('PAW_COLOUR', 'TEXT', 'NO'),395 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),396 ('PATTERN', 'TEXT', 'YES')397 })398 cur.execute('''399 SELECT {} FROM {}.{}.{}400 '''.format(401 sql.identifier('NAME'),402 sql.identifier(CONFIG['snowflake_database']),403 sql.identifier(CONFIG['snowflake_schema']),404 sql.identifier('CATS')405 ))406 persisted_records = cur.fetchall()407 ## Assert that the original data is present408 assert cat_count == len(persisted_records)409 assert cat_count == len([x for x in persisted_records if x[0] is not None])410 class NameBooleanCatStream(CatStream):411 def generate_record(self):412 record = CatStream.generate_record(self)413 record['id'] = record['id'] + cat_count414 record['name'] = False415 return record416 stream = NameBooleanCatStream(cat_count)417 stream.schema = deepcopy(stream.schema)418 stream.schema['schema']['properties']['name'] = {'type': 'boolean'}419 main(CONFIG, input_stream=stream)420 with connect(**TEST_DB) as conn:421 with conn.cursor() as cur:422 assert_columns_equal(cur,423 'CATS',424 {425 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),426 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),427 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),428 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),429 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),430 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),431 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),432 ('AGE', 'NUMBER', 'YES'),433 ('ID', 'NUMBER', 'NO'),434 ('NAME__S', 'TEXT', 'YES'),435 ('NAME__B', 'BOOLEAN', 'YES'),436 ('PAW_SIZE', 'NUMBER', 'NO'),437 ('PAW_COLOUR', 'TEXT', 'NO'),438 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),439 ('PATTERN', 'TEXT', 'YES')440 })441 cur.execute('''442 SELECT {}, {} FROM {}.{}.{}443 '''.format(444 sql.identifier('NAME__S'),445 sql.identifier('NAME__B'),446 sql.identifier(CONFIG['snowflake_database']),447 sql.identifier(CONFIG['snowflake_schema']),448 sql.identifier('CATS')449 ))450 persisted_records = cur.fetchall()451 ## Assert that the split columns migrated data/persisted new data452 assert 2 * cat_count == len(persisted_records)453 assert cat_count == len([x for x in persisted_records if x[0] is not None])454 assert cat_count == len([x for x in persisted_records if x[1] is not None])455 assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])456 class NameIntegerCatStream(CatStream):457 def generate_record(self):458 record = CatStream.generate_record(self)459 record['id'] = record['id'] + (2 * cat_count)460 record['name'] = 314461 return record462 stream = NameIntegerCatStream(cat_count)463 stream.schema = deepcopy(stream.schema)464 stream.schema['schema']['properties']['name'] = {'type': 'integer'}465 main(CONFIG, input_stream=stream)466 with connect(**TEST_DB) as conn:467 with conn.cursor() as cur:468 assert_columns_equal(cur,469 'CATS',470 {471 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),472 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),473 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),474 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),475 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),476 ('ADOPTION__ADOPTED_ON', 'TIMESTAMP_TZ', 'YES'),477 ('ADOPTION__WAS_FOSTER', 'BOOLEAN', 'YES'),478 ('AGE', 'NUMBER', 'YES'),479 ('ID', 'NUMBER', 'NO'),480 ('NAME__S', 'TEXT', 'YES'),481 ('NAME__B', 'BOOLEAN', 'YES'),482 ('NAME__I', 'NUMBER', 'YES'),483 ('PAW_SIZE', 'NUMBER', 'NO'),484 ('PAW_COLOUR', 'TEXT', 'NO'),485 ('FLEA_CHECK_COMPLETE', 'BOOLEAN', 'NO'),486 ('PATTERN', 'TEXT', 'YES')487 })488 cur.execute('''489 SELECT {}, {}, {} FROM {}.{}.{}490 '''.format(491 sql.identifier('NAME__S'),492 sql.identifier('NAME__B'),493 sql.identifier('NAME__I'),494 sql.identifier(CONFIG['snowflake_database']),495 sql.identifier(CONFIG['snowflake_schema']),496 sql.identifier('CATS')497 ))498 persisted_records = cur.fetchall()499 ## Assert that the split columns migrated data/persisted new data500 assert 3 * cat_count == len(persisted_records)501 assert cat_count == len([x for x in persisted_records if x[0] is not None])502 assert cat_count == len([x for x in persisted_records if x[1] is not None])503 assert cat_count == len([x for x in persisted_records if x[2] is not None])504 assert 0 == len(505 [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])506 assert 0 == len([x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None])507def test_loading__multi_types_columns(db_prep):508 stream_count = 50509 main(CONFIG, input_stream=MultiTypeStream(stream_count))510 with connect(**TEST_DB) as conn:511 with conn.cursor() as cur:512 assert_columns_equal(cur,513 'ROOT',514 {515 ('_SDC_PRIMARY_KEY', 'TEXT', 'NO'),516 ('_SDC_BATCHED_AT', 'TIMESTAMP_TZ', 'YES'),517 ('_SDC_RECEIVED_AT', 'TIMESTAMP_TZ', 'YES'),518 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),519 ('_SDC_TABLE_VERSION', 'NUMBER', 'YES'),520 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),521 ('EVERY_TYPE__I', 'NUMBER', 'YES'),522 ('EVERY_TYPE__F', 'FLOAT', 'YES'),523 ('EVERY_TYPE__B', 'BOOLEAN', 'YES'),524 ('EVERY_TYPE__T', 'TIMESTAMP_TZ', 'YES'),525 ('EVERY_TYPE__I__1', 'NUMBER', 'YES'),526 ('EVERY_TYPE__F__1', 'FLOAT', 'YES'),527 ('EVERY_TYPE__B__1', 'BOOLEAN', 'YES'),528 ('NUMBER_WHICH_ONLY_COMES_AS_INTEGER', 'FLOAT', 'NO')529 })530 assert_columns_equal(cur,531 'ROOT__EVERY_TYPE',532 {533 ('_SDC_SOURCE_KEY__SDC_PRIMARY_KEY', 'TEXT', 'NO'),534 ('_SDC_LEVEL_0_ID', 'NUMBER', 'NO'),535 ('_SDC_SEQUENCE', 'NUMBER', 'YES'),536 ('_SDC_VALUE', 'NUMBER', 'NO'),537 ('_SDC_TARGET_SNOWFLAKE_CREATE_TABLE_PLACEHOLDER', 'BOOLEAN', 'YES'),538 })539 cur.execute('''540 SELECT {} FROM {}.{}.{}541 '''.format(542 sql.identifier('NUMBER_WHICH_ONLY_COMES_AS_INTEGER'),543 sql.identifier(CONFIG['snowflake_database']),544 sql.identifier(CONFIG['snowflake_schema']),545 sql.identifier('ROOT')546 ))547 persisted_records = cur.fetchall()548 ## Assert that the column is has migrated data549 assert stream_count == len(persisted_records)550 assert stream_count == len([x for x in persisted_records if isinstance(x[0], float)])551def test_upsert(db_prep):552 stream = CatStream(100)553 main(CONFIG, input_stream=stream)554 with connect(**TEST_DB) as conn:555 with conn.cursor() as cur:556 assert_count_equal(cur, 'CATS', 100)557 assert_records(conn, stream.records, 'CATS', 'ID')558 stream = CatStream(100)559 main(CONFIG, input_stream=stream)560 with connect(**TEST_DB) as conn:561 with conn.cursor() as cur:562 assert_count_equal(cur, 'CATS', 100)563 assert_records(conn, stream.records, 'CATS', 'ID')564 stream = CatStream(200)565 main(CONFIG, input_stream=stream)566 with connect(**TEST_DB) as conn:567 with conn.cursor() as cur:568 assert_count_equal(cur, 'CATS', 200)569 assert_records(conn, stream.records, 'CATS', 'ID')570def test_nested_delete_on_parent(db_prep):571 stream = CatStream(100, nested_count=3)572 main(CONFIG, input_stream=stream)573 with connect(**TEST_DB) as conn:574 with conn.cursor() as cur:575 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))576 high_nested = cur.fetchone()[0]577 assert_records(conn, stream.records, 'CATS', 'ID')578 stream = CatStream(100, nested_count=2)579 main(CONFIG, input_stream=stream)580 with connect(**TEST_DB) as conn:581 with conn.cursor() as cur:582 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))583 low_nested = cur.fetchone()[0]584 assert_records(conn, stream.records, 'CATS', 'ID')585 assert low_nested < high_nested586def test_full_table_replication(db_prep):587 stream = CatStream(110, version=0, nested_count=3)588 main(CONFIG, input_stream=stream)589 with connect(**TEST_DB) as conn:590 with conn.cursor() as cur:591 cur.execute(get_count_sql('CATS'))592 version_0_count = cur.fetchone()[0]593 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))594 version_0_sub_count = cur.fetchone()[0]595 assert_records(conn, stream.records, 'CATS', 'ID', match_pks=True)596 assert version_0_count == 110597 assert version_0_sub_count == 330598 stream = CatStream(100, version=1, nested_count=3)599 main(CONFIG, input_stream=stream)600 with connect(**TEST_DB) as conn:601 with conn.cursor() as cur:602 cur.execute(get_count_sql('CATS'))603 version_1_count = cur.fetchone()[0]604 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))605 version_1_sub_count = cur.fetchone()[0]606 assert_records(conn, stream.records, 'CATS', 'ID', match_pks=True)607 assert version_1_count == 100608 assert version_1_sub_count == 300609 stream = CatStream(120, version=2, nested_count=2)610 main(CONFIG, input_stream=stream)611 with connect(**TEST_DB) as conn:612 with conn.cursor() as cur:613 cur.execute(get_count_sql('CATS'))614 version_2_count = cur.fetchone()[0]615 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))616 version_2_sub_count = cur.fetchone()[0]617 assert_records(conn, stream.records, 'CATS', 'ID', match_pks=True)618 assert version_2_count == 120619 assert version_2_sub_count == 240620 ## Test that an outdated version cannot overwrite621 stream = CatStream(314, version=1, nested_count=2)622 main(CONFIG, input_stream=stream)623 with connect(**TEST_DB) as conn:624 with conn.cursor() as cur:625 cur.execute(get_count_sql('CATS'))626 older_version_count = cur.fetchone()[0]627 assert older_version_count == version_2_count628def test_deduplication_newer_rows(db_prep):629 stream = CatStream(100, nested_count=3, duplicates=2)630 main(CONFIG, input_stream=stream)631 with connect(**TEST_DB) as conn:632 with conn.cursor() as cur:633 cur.execute(get_count_sql('CATS'))634 table_count = cur.fetchone()[0]635 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))636 nested_table_count = cur.fetchone()[0]637 cur.execute('''638 SELECT "_SDC_SEQUENCE"639 FROM {}.{}.{}640 WHERE "ID" in ({})641 '''.format(642 sql.identifier(CONFIG['snowflake_database']),643 sql.identifier(CONFIG['snowflake_schema']),644 sql.identifier('CATS'),645 ','.join(["'{}'".format(x) for x in stream.duplicate_pks_used])646 ))647 dup_cat_records = cur.fetchall()648 assert stream.record_message_count == 102649 assert table_count == 100650 assert nested_table_count == 300651 for record in dup_cat_records:652 assert record[0] == stream.sequence + 200653def test_deduplication_older_rows(db_prep):654 stream = CatStream(100, nested_count=2, duplicates=2, duplicate_sequence_delta=-100)655 main(CONFIG, input_stream=stream)656 with connect(**TEST_DB) as conn:657 with conn.cursor() as cur:658 cur.execute(get_count_sql('CATS'))659 table_count = cur.fetchone()[0]660 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))661 nested_table_count = cur.fetchone()[0]662 663 cur.execute('''664 SELECT "_SDC_SEQUENCE"665 FROM {}.{}.{}666 WHERE "ID" in ({})667 '''.format(668 sql.identifier(CONFIG['snowflake_database']),669 sql.identifier(CONFIG['snowflake_schema']),670 sql.identifier('CATS'),671 ','.join(["'{}'".format(x) for x in stream.duplicate_pks_used])672 ))673 dup_cat_records = cur.fetchall()674 assert stream.record_message_count == 102675 assert table_count == 100676 assert nested_table_count == 200677 for record in dup_cat_records:678 assert record[0] == stream.sequence679def test_deduplication_existing_new_rows(db_prep):680 stream = CatStream(100, nested_count=2)681 main(CONFIG, input_stream=stream)682 original_sequence = stream.sequence683 stream = CatStream(100,684 nested_count=2,685 sequence=original_sequence - 20)686 main(CONFIG, input_stream=stream)687 with connect(**TEST_DB) as conn:688 with conn.cursor() as cur:689 cur.execute(get_count_sql('CATS'))690 table_count = cur.fetchone()[0]691 cur.execute(get_count_sql('CATS__ADOPTION__IMMUNIZATIONS'))692 nested_table_count = cur.fetchone()[0]693 cur.execute('''694 SELECT DISTINCT "_SDC_SEQUENCE"695 FROM {}.{}.{}696 '''.format(697 sql.identifier(CONFIG['snowflake_database']),698 sql.identifier(CONFIG['snowflake_schema']),699 sql.identifier('CATS')700 ))701 sequences = cur.fetchall()702 assert table_count == 100703 assert nested_table_count == 200704 assert len(sequences) == 1705 assert sequences[0][0] == original_sequence

Full Screen

Full Screen

test_target_redshift.py

Source:test_target_redshift.py Github

copy

Full Screen

...19 expected_column_tuples.add(20 ('_sdc_target_redshift_create_table_placeholder', 'boolean', 'YES')21 )22 assert set(columns) == expected_column_tuples23def get_count_sql(table_name):24 return sql.SQL(25 'SELECT count(*) FROM {}.{}'26 ).format(27 sql.Identifier(CONFIG['redshift_schema']),28 sql.Identifier(table_name))29def get_pk_key(pks, obj, subrecord=False):30 pk_parts = []31 for pk in pks:32 pk_parts.append(str(obj[pk]))33 if subrecord:34 for key, value in obj.items():35 if key[:11] == '_sdc_level_':36 pk_parts.append(str(value))37 return ':'.join(pk_parts)38def flatten_record(old_obj, subtables, subpks, new_obj=None, current_path=None, level=0):39 if not new_obj:40 new_obj = {}41 for prop, value in old_obj.items():42 if current_path:43 next_path = current_path + '__' + prop44 else:45 next_path = prop46 if isinstance(value, dict):47 flatten_record(value, subtables, subpks, new_obj=new_obj, current_path=next_path, level=level)48 elif isinstance(value, list):49 if next_path not in subtables:50 subtables[next_path] = []51 row_index = 052 for item in value:53 new_subobj = {}54 for key, value in subpks.items():55 new_subobj[key] = value56 new_subpks = subpks.copy()57 new_subobj[singer_stream.SINGER_LEVEL.format(level)] = row_index58 new_subpks[singer_stream.SINGER_LEVEL.format(level)] = row_index59 subtables[next_path].append(flatten_record(item,60 subtables,61 new_subpks,62 new_obj=new_subobj,63 level=level + 1))64 row_index += 165 else:66 new_obj[next_path] = value67 return new_obj68def assert_record(a, b, subtables, subpks):69 a_flat = flatten_record(a, subtables, subpks)70 for prop, value in a_flat.items():71 if value is None:72 if prop in b:73 assert b[prop] == None74 elif isinstance(b[prop], datetime):75 assert value == b[prop].isoformat()[:19]76 else:77 assert value == b[prop]78def assert_records(conn, records, table_name, pks, match_pks=False):79 if not isinstance(pks, list):80 pks = [pks]81 with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:82 cur.execute("set timezone='UTC';")83 cur.execute(sql.SQL(84 'SELECT * FROM {}.{}'85 ).format(86 sql.Identifier(CONFIG['redshift_schema']),87 sql.Identifier(table_name)))88 persisted_records_raw = cur.fetchall()89 persisted_records = {}90 for persisted_record in persisted_records_raw:91 pk = get_pk_key(pks, persisted_record)92 persisted_records[pk] = persisted_record93 subtables = {}94 records_pks = []95 for record in records:96 pk = get_pk_key(pks, record)97 records_pks.append(pk)98 persisted_record = persisted_records[pk]99 subpks = {}100 for pk in pks:101 subpks[singer_stream.SINGER_SOURCE_PK_PREFIX + pk] = persisted_record[pk]102 assert_record(record, persisted_record, subtables, subpks)103 if match_pks:104 assert sorted(list(persisted_records.keys())) == sorted(records_pks)105 sub_pks = list(map(lambda pk: singer_stream.SINGER_SOURCE_PK_PREFIX + pk, pks))106 for subtable_name, items in subtables.items():107 cur.execute(sql.SQL(108 'SELECT * FROM {}.{}'109 ).format(110 sql.Identifier(CONFIG['redshift_schema']),111 sql.Identifier(table_name + '__' + subtable_name)))112 persisted_records_raw = cur.fetchall()113 persisted_records = {}114 for persisted_record in persisted_records_raw:115 pk = get_pk_key(sub_pks, persisted_record, subrecord=True)116 persisted_records[pk] = persisted_record117 subtables = {}118 records_pks = []119 for record in items:120 pk = get_pk_key(sub_pks, record, subrecord=True)121 records_pks.append(pk)122 persisted_record = persisted_records[pk]123 assert_record(record, persisted_record, subtables, subpks)124 assert len(subtables.values()) == 0125 if match_pks:126 assert sorted(list(persisted_records.keys())) == sorted(records_pks)127def test_loading__invalid__configuration__schema(db_prep):128 stream = CatStream(1)129 stream.schema = deepcopy(stream.schema)130 stream.schema['schema']['type'] = 'invalid type for a JSON Schema'131 with pytest.raises(TargetError, match=r'.*invalid JSON Schema instance.*'):132 main(CONFIG, input_stream=stream)133def test_loading__simple(db_prep):134 stream = CatStream(100)135 main(CONFIG, input_stream=stream)136 with psycopg2.connect(**TEST_DB) as conn:137 with conn.cursor() as cur:138 assert_columns_equal(cur,139 'cats',140 {141 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),142 ('_sdc_received_at', 'timestamp with time zone', 'YES'),143 ('_sdc_sequence', 'bigint', 'YES'),144 ('_sdc_table_version', 'bigint', 'YES'),145 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),146 ('adoption__was_foster', 'boolean', 'YES'),147 ('age', 'bigint', 'YES'),148 ('id', 'bigint', 'YES'),149 ('name', 'character varying', 'YES'),150 ('paw_size', 'bigint', 'YES'),151 ('paw_colour', 'character varying', 'YES'),152 ('flea_check_complete', 'boolean', 'YES'),153 ('pattern', 'character varying', 'YES')154 })155 assert_columns_equal(cur,156 'cats__adoption__immunizations',157 {158 ('_sdc_level_0_id', 'bigint', 'YES'),159 ('_sdc_sequence', 'bigint', 'YES'),160 ('_sdc_source_key_id', 'bigint', 'YES'),161 ('date_administered', 'timestamp with time zone', 'YES'),162 ('type', 'character varying', 'YES')163 })164 cur.execute(get_count_sql('cats'))165 assert cur.fetchone()[0] == 100166 for record in stream.records:167 record['paw_size'] = 314159168 record['paw_colour'] = ''169 record['flea_check_complete'] = False170 assert_records(conn, stream.records, 'cats', 'id')171def test_loading__nested_tables(db_prep):172 main(CONFIG, input_stream=NestedStream(10))173 with psycopg2.connect(**TEST_DB) as conn:174 with conn.cursor() as cur:175 cur.execute(get_count_sql('root'))176 assert 10 == cur.fetchone()[0]177 cur.execute(get_count_sql('root__array_scalar'))178 assert 50 == cur.fetchone()[0]179 cur.execute(180 get_count_sql('root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar'))181 assert 50 == cur.fetchone()[0]182 cur.execute(get_count_sql('root__array_of_array'))183 assert 20 == cur.fetchone()[0]184 cur.execute(get_count_sql('root__array_of_array___sdc_value'))185 assert 80 == cur.fetchone()[0]186 cur.execute(get_count_sql('root__array_of_array___sdc_value___sdc_value'))187 assert 200 == cur.fetchone()[0]188 assert_columns_equal(cur,189 'root',190 {191 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),192 ('_sdc_received_at', 'timestamp with time zone', 'YES'),193 ('_sdc_sequence', 'bigint', 'YES'),194 ('_sdc_table_version', 'bigint', 'YES'),195 ('id', 'bigint', 'YES'),196 ('null', 'bigint', 'YES'),197 ('nested_null__null', 'bigint', 'YES'),198 ('object_of_object_0__object_of_object_1__object_of_object_2__a', 'bigint', 'YES'),199 ('object_of_object_0__object_of_object_1__object_of_object_2__b', 'bigint', 'YES'),200 ('object_of_object_0__object_of_object_1__object_of_object_2__c', 'bigint', 'YES')201 })202 assert_columns_equal(cur,203 'root__object_of_object_0__object_of_object_1__object_of_object_2__array_scalar',204 {205 ('_sdc_sequence', 'bigint', 'YES'),206 ('_sdc_source_key_id', 'bigint', 'YES'),207 ('_sdc_level_0_id', 'bigint', 'YES'),208 ('_sdc_value', 'boolean', 'YES')209 })210 assert_columns_equal(cur,211 'root__array_of_array',212 {213 ('_sdc_sequence', 'bigint', 'YES'),214 ('_sdc_source_key_id', 'bigint', 'YES'),215 ('_sdc_level_0_id', 'bigint', 'YES')216 })217 assert_columns_equal(cur,218 'root__array_of_array___sdc_value',219 {220 ('_sdc_sequence', 'bigint', 'YES'),221 ('_sdc_source_key_id', 'bigint', 'YES'),222 ('_sdc_level_0_id', 'bigint', 'YES'),223 ('_sdc_level_1_id', 'bigint', 'YES')224 })225 assert_columns_equal(cur,226 'root__array_of_array___sdc_value___sdc_value',227 {228 ('_sdc_sequence', 'bigint', 'YES'),229 ('_sdc_source_key_id', 'bigint', 'YES'),230 ('_sdc_level_0_id', 'bigint', 'YES'),231 ('_sdc_level_1_id', 'bigint', 'YES'),232 ('_sdc_level_2_id', 'bigint', 'YES'),233 ('_sdc_value', 'bigint', 'YES')234 })235def test_loading__new_non_null_column(db_prep):236 cat_count = 50237 main(CONFIG, input_stream=CatStream(cat_count))238 class NonNullStream(CatStream):239 def generate_record(self):240 record = CatStream.generate_record(self)241 record['id'] = record['id'] + cat_count242 return record243 non_null_stream = NonNullStream(cat_count)244 non_null_stream.schema = deepcopy(non_null_stream.schema)245 non_null_stream.schema['schema']['properties']['paw_toe_count'] = {'type': 'integer',246 'default': 5}247 main(CONFIG, input_stream=non_null_stream)248 with psycopg2.connect(**TEST_DB) as conn:249 with conn.cursor() as cur:250 assert_columns_equal(cur,251 'cats',252 {253 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),254 ('_sdc_received_at', 'timestamp with time zone', 'YES'),255 ('_sdc_sequence', 'bigint', 'YES'),256 ('_sdc_table_version', 'bigint', 'YES'),257 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),258 ('adoption__was_foster', 'boolean', 'YES'),259 ('age', 'bigint', 'YES'),260 ('id', 'bigint', 'YES'),261 ('name', 'character varying', 'YES'),262 ('paw_size', 'bigint', 'YES'),263 ('paw_colour', 'character varying', 'YES'),264 ('paw_toe_count', 'bigint', 'YES'),265 ('flea_check_complete', 'boolean', 'YES'),266 ('pattern', 'character varying', 'YES')267 })268 cur.execute(sql.SQL('SELECT {}, {} FROM {}.{}').format(269 sql.Identifier('id'),270 sql.Identifier('paw_toe_count'),271 sql.Identifier(CONFIG['redshift_schema']),272 sql.Identifier('cats')273 ))274 persisted_records = cur.fetchall()275 ## Assert that the split columns before/after new non-null data276 assert 2 * cat_count == len(persisted_records)277 assert cat_count == len([x for x in persisted_records if x[1] is None])278 assert cat_count == len([x for x in persisted_records if x[1] is not None])279def test_loading__column_type_change(db_prep):280 cat_count = 20281 main(CONFIG, input_stream=CatStream(cat_count))282 with psycopg2.connect(**TEST_DB) as conn:283 with conn.cursor() as cur:284 assert_columns_equal(cur,285 'cats',286 {287 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),288 ('_sdc_received_at', 'timestamp with time zone', 'YES'),289 ('_sdc_sequence', 'bigint', 'YES'),290 ('_sdc_table_version', 'bigint', 'YES'),291 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),292 ('adoption__was_foster', 'boolean', 'YES'),293 ('age', 'bigint', 'YES'),294 ('id', 'bigint', 'YES'),295 ('name', 'character varying', 'YES'),296 ('paw_size', 'bigint', 'YES'),297 ('paw_colour', 'character varying', 'YES'),298 ('flea_check_complete', 'boolean', 'YES'),299 ('pattern', 'character varying', 'YES')300 })301 cur.execute(sql.SQL('SELECT {} FROM {}.{}').format(302 sql.Identifier('name'),303 sql.Identifier(CONFIG['redshift_schema']),304 sql.Identifier('cats')305 ))306 persisted_records = cur.fetchall()307 ## Assert that the original data is present308 assert cat_count == len(persisted_records)309 assert cat_count == len([x for x in persisted_records if x[0] is not None])310 class NameBooleanCatStream(CatStream):311 def generate_record(self):312 record = CatStream.generate_record(self)313 record['id'] = record['id'] + cat_count314 record['name'] = False315 return record316 stream = NameBooleanCatStream(cat_count)317 stream.schema = deepcopy(stream.schema)318 stream.schema['schema']['properties']['name'] = {'type': 'boolean'}319 main(CONFIG, input_stream=stream)320 with psycopg2.connect(**TEST_DB) as conn:321 with conn.cursor() as cur:322 assert_columns_equal(cur,323 'cats',324 {325 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),326 ('_sdc_received_at', 'timestamp with time zone', 'YES'),327 ('_sdc_sequence', 'bigint', 'YES'),328 ('_sdc_table_version', 'bigint', 'YES'),329 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),330 ('adoption__was_foster', 'boolean', 'YES'),331 ('age', 'bigint', 'YES'),332 ('id', 'bigint', 'YES'),333 ('name__s', 'character varying', 'YES'),334 ('name__b', 'boolean', 'YES'),335 ('paw_size', 'bigint', 'YES'),336 ('paw_colour', 'character varying', 'YES'),337 ('flea_check_complete', 'boolean', 'YES'),338 ('pattern', 'character varying', 'YES')339 })340 cur.execute(sql.SQL('SELECT {}, {} FROM {}.{}').format(341 sql.Identifier('name__s'),342 sql.Identifier('name__b'),343 sql.Identifier(CONFIG['redshift_schema']),344 sql.Identifier('cats')345 ))346 persisted_records = cur.fetchall()347 ## Assert that the split columns migrated data/persisted new data348 assert 2 * cat_count == len(persisted_records)349 assert cat_count == len([x for x in persisted_records if x[0] is not None])350 assert cat_count == len([x for x in persisted_records if x[1] is not None])351 assert 0 == len([x for x in persisted_records if x[0] is not None and x[1] is not None])352 class NameIntegerCatStream(CatStream):353 def generate_record(self):354 record = CatStream.generate_record(self)355 record['id'] = record['id'] + (2 * cat_count)356 record['name'] = 314357 return record358 stream = NameIntegerCatStream(cat_count)359 stream.schema = deepcopy(stream.schema)360 stream.schema['schema']['properties']['name'] = {'type': 'integer'}361 main(CONFIG, input_stream=stream)362 with psycopg2.connect(**TEST_DB) as conn:363 with conn.cursor() as cur:364 assert_columns_equal(cur,365 'cats',366 {367 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),368 ('_sdc_received_at', 'timestamp with time zone', 'YES'),369 ('_sdc_sequence', 'bigint', 'YES'),370 ('_sdc_table_version', 'bigint', 'YES'),371 ('adoption__adopted_on', 'timestamp with time zone', 'YES'),372 ('adoption__was_foster', 'boolean', 'YES'),373 ('age', 'bigint', 'YES'),374 ('id', 'bigint', 'YES'),375 ('name__s', 'character varying', 'YES'),376 ('name__b', 'boolean', 'YES'),377 ('name__i', 'bigint', 'YES'),378 ('paw_size', 'bigint', 'YES'),379 ('paw_colour', 'character varying', 'YES'),380 ('flea_check_complete', 'boolean', 'YES'),381 ('pattern', 'character varying', 'YES')382 })383 cur.execute(sql.SQL('SELECT {}, {}, {} FROM {}.{}').format(384 sql.Identifier('name__s'),385 sql.Identifier('name__b'),386 sql.Identifier('name__i'),387 sql.Identifier(CONFIG['redshift_schema']),388 sql.Identifier('cats')389 ))390 persisted_records = cur.fetchall()391 ## Assert that the split columns migrated data/persisted new data392 assert 3 * cat_count == len(persisted_records)393 assert cat_count == len([x for x in persisted_records if x[0] is not None])394 assert cat_count == len([x for x in persisted_records if x[1] is not None])395 assert cat_count == len([x for x in persisted_records if x[2] is not None])396 assert 0 == len(397 [x for x in persisted_records if x[0] is not None and x[1] is not None and x[2] is not None])398 assert 0 == len([x for x in persisted_records if x[0] is None and x[1] is None and x[2] is None])399def test_loading__multi_types_columns(db_prep):400 stream_count = 50401 main(CONFIG, input_stream=MultiTypeStream(stream_count))402 with psycopg2.connect(**TEST_DB) as conn:403 with conn.cursor() as cur:404 assert_columns_equal(cur,405 'root',406 {407 ('_sdc_primary_key', 'character varying', 'YES'),408 ('_sdc_batched_at', 'timestamp with time zone', 'YES'),409 ('_sdc_received_at', 'timestamp with time zone', 'YES'),410 ('_sdc_sequence', 'bigint', 'YES'),411 ('_sdc_table_version', 'bigint', 'YES'),412 ('every_type__i', 'bigint', 'YES'),413 ('every_type__f', 'double precision', 'YES'),414 ('every_type__b', 'boolean', 'YES'),415 ('every_type__t', 'timestamp with time zone', 'YES'),416 ('every_type__i__1', 'bigint', 'YES'),417 ('every_type__f__1', 'double precision', 'YES'),418 ('every_type__b__1', 'boolean', 'YES'),419 ('number_which_only_comes_as_integer', 'double precision', 'YES')420 })421 assert_columns_equal(cur,422 'root__every_type',423 {424 ('_sdc_source_key__sdc_primary_key', 'character varying', 'YES'),425 ('_sdc_level_0_id', 'bigint', 'YES'),426 ('_sdc_sequence', 'bigint', 'YES'),427 ('_sdc_value', 'bigint', 'YES'),428 })429 cur.execute(sql.SQL('SELECT {} FROM {}.{}').format(430 sql.Identifier('number_which_only_comes_as_integer'),431 sql.Identifier(CONFIG['redshift_schema']),432 sql.Identifier('root')433 ))434 persisted_records = cur.fetchall()435 ## Assert that the column is has migrated data436 assert stream_count == len(persisted_records)437 assert stream_count == len([x for x in persisted_records if isinstance(x[0], float)])438def test_upsert(db_prep):439 stream = CatStream(100)440 main(CONFIG, input_stream=stream)441 with psycopg2.connect(**TEST_DB) as conn:442 with conn.cursor() as cur:443 cur.execute(get_count_sql('cats'))444 assert cur.fetchone()[0] == 100445 assert_records(conn, stream.records, 'cats', 'id')446 stream = CatStream(100)447 main(CONFIG, input_stream=stream)448 with psycopg2.connect(**TEST_DB) as conn:449 with conn.cursor() as cur:450 cur.execute(get_count_sql('cats'))451 assert cur.fetchone()[0] == 100452 assert_records(conn, stream.records, 'cats', 'id')453 stream = CatStream(200)454 main(CONFIG, input_stream=stream)455 with psycopg2.connect(**TEST_DB) as conn:456 with conn.cursor() as cur:457 cur.execute(get_count_sql('cats'))458 assert cur.fetchone()[0] == 200459 assert_records(conn, stream.records, 'cats', 'id')460def test_nested_delete_on_parent(db_prep):461 stream = CatStream(100, nested_count=3)462 main(CONFIG, input_stream=stream)463 with psycopg2.connect(**TEST_DB) as conn:464 with conn.cursor() as cur:465 cur.execute(get_count_sql('cats__adoption__immunizations'))466 high_nested = cur.fetchone()[0]467 assert_records(conn, stream.records, 'cats', 'id')468 stream = CatStream(100, nested_count=2)469 main(CONFIG, input_stream=stream)470 with psycopg2.connect(**TEST_DB) as conn:471 with conn.cursor() as cur:472 cur.execute(get_count_sql('cats__adoption__immunizations'))473 low_nested = cur.fetchone()[0]474 assert_records(conn, stream.records, 'cats', 'id')475 assert low_nested < high_nested476def test_full_table_replication(db_prep):477 stream = CatStream(110, version=0, nested_count=3)478 main(CONFIG, input_stream=stream)479 with psycopg2.connect(**TEST_DB) as conn:480 with conn.cursor() as cur:481 cur.execute(get_count_sql('cats'))482 version_0_count = cur.fetchone()[0]483 cur.execute(get_count_sql('cats__adoption__immunizations'))484 version_0_sub_count = cur.fetchone()[0]485 assert_records(conn, stream.records, 'cats', 'id', match_pks=True)486 assert version_0_count == 110487 assert version_0_sub_count == 330488 stream = CatStream(100, version=1, nested_count=3)489 main(CONFIG, input_stream=stream)490 with psycopg2.connect(**TEST_DB) as conn:491 with conn.cursor() as cur:492 cur.execute(get_count_sql('cats'))493 version_1_count = cur.fetchone()[0]494 cur.execute(get_count_sql('cats__adoption__immunizations'))495 version_1_sub_count = cur.fetchone()[0]496 assert_records(conn, stream.records, 'cats', 'id', match_pks=True)497 assert version_1_count == 100498 assert version_1_sub_count == 300499 stream = CatStream(120, version=2, nested_count=2)500 main(CONFIG, input_stream=stream)501 with psycopg2.connect(**TEST_DB) as conn:502 with conn.cursor() as cur:503 cur.execute(get_count_sql('cats'))504 version_2_count = cur.fetchone()[0]505 cur.execute(get_count_sql('cats__adoption__immunizations'))506 version_2_sub_count = cur.fetchone()[0]507 assert_records(conn, stream.records, 'cats', 'id', match_pks=True)508 assert version_2_count == 120509 assert version_2_sub_count == 240510 ## Test that an outdated version cannot overwrite511 stream = CatStream(314, version=1, nested_count=2)512 main(CONFIG, input_stream=stream)513 with psycopg2.connect(**TEST_DB) as conn:514 with conn.cursor() as cur:515 cur.execute(get_count_sql('cats'))516 older_version_count = cur.fetchone()[0]517 assert older_version_count == version_2_count518def test_deduplication_newer_rows(db_prep):519 stream = CatStream(100, nested_count=3, duplicates=2)520 main(CONFIG, input_stream=stream)521 with psycopg2.connect(**TEST_DB) as conn:522 with conn.cursor() as cur:523 cur.execute(get_count_sql('cats'))524 table_count = cur.fetchone()[0]525 cur.execute(get_count_sql('cats__adoption__immunizations'))526 nested_table_count = cur.fetchone()[0]527 cur.execute(sql.SQL(528 'SELECT _sdc_sequence FROM {}.{} WHERE id in '529 + '({})'.format(','.join(map(str, stream.duplicate_pks_used)))530 ).format(531 sql.Identifier(CONFIG['redshift_schema']),532 sql.Identifier('cats'),533 sql.Literal(','.join(map(str, stream.duplicate_pks_used)))))534 dup_cat_records = cur.fetchall()535 assert stream.record_message_count == 102536 assert table_count == 100537 assert nested_table_count == 300538 for record in dup_cat_records:539 assert record[0] == stream.sequence + 200540def test_deduplication_older_rows(db_prep):541 stream = CatStream(100, nested_count=2, duplicates=2, duplicate_sequence_delta=-100)542 main(CONFIG, input_stream=stream)543 with psycopg2.connect(**TEST_DB) as conn:544 with conn.cursor() as cur:545 cur.execute(get_count_sql('cats'))546 table_count = cur.fetchone()[0]547 cur.execute(get_count_sql('cats__adoption__immunizations'))548 nested_table_count = cur.fetchone()[0]549 cur.execute(sql.SQL(550 'SELECT _sdc_sequence FROM {}.{} WHERE id in '551 + '({})'.format(','.join(map(str, stream.duplicate_pks_used)))552 ).format(553 sql.Identifier(CONFIG['redshift_schema']),554 sql.Identifier('cats')))555 dup_cat_records = cur.fetchall()556 assert stream.record_message_count == 102557 assert table_count == 100558 assert nested_table_count == 200559 for record in dup_cat_records:560 assert record[0] == stream.sequence561def test_deduplication_existing_new_rows(db_prep):562 stream = CatStream(100, nested_count=2)563 main(CONFIG, input_stream=stream)564 original_sequence = stream.sequence565 stream = CatStream(100,566 nested_count=2,567 sequence=original_sequence - 20)568 main(CONFIG, input_stream=stream)569 with psycopg2.connect(**TEST_DB) as conn:570 with conn.cursor() as cur:571 cur.execute(get_count_sql('cats'))572 table_count = cur.fetchone()[0]573 cur.execute(get_count_sql('cats__adoption__immunizations'))574 nested_table_count = cur.fetchone()[0]575 cur.execute(sql.SQL(576 'SELECT DISTINCT _sdc_sequence FROM {}.{}'577 ).format(578 sql.Identifier(CONFIG['redshift_schema']),579 sql.Identifier('cats')))580 sequences = cur.fetchall()581 assert table_count == 100582 assert nested_table_count == 200583 assert len(sequences) == 1...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful