How to use QuestDBContainer class of org.testcontainers.containers package

Best Testcontainers-java code snippet using org.testcontainers.containers.QuestDBContainer

Source:DebeziumIT.java Github

copy

Full Screen

1package kafka;2import io.debezium.testing.testcontainers.ConnectorConfiguration;3import io.debezium.testing.testcontainers.DebeziumContainer;4import io.questdb.client.Sender;5import io.questdb.kafka.QuestDBSinkConnector;6import io.questdb.kafka.QuestDBSinkConnectorConfig;7import io.questdb.kafka.QuestDBSinkTask;8import io.questdb.kafka.QuestDBUtils;9import io.questdb.kafka.JarResolverExtension;10import org.apache.kafka.connect.json.JsonConverter;11import org.junit.jupiter.api.AfterEach;12import org.junit.jupiter.api.Assertions;13import org.junit.jupiter.api.BeforeAll;14import org.junit.jupiter.api.Test;15import org.junit.jupiter.api.extension.RegisterExtension;16import org.slf4j.LoggerFactory;17import org.testcontainers.containers.GenericContainer;18import org.testcontainers.containers.KafkaContainer;19import org.testcontainers.containers.Network;20import org.testcontainers.containers.PostgreSQLContainer;21import org.testcontainers.containers.output.Slf4jLogConsumer;22import org.testcontainers.junit.jupiter.Container;23import org.testcontainers.junit.jupiter.Testcontainers;24import org.testcontainers.lifecycle.Startables;25import org.testcontainers.utility.DockerImageName;26import org.testcontainers.utility.MountableFile;27import java.sql.Connection;28import java.sql.DriverManager;29import java.sql.PreparedStatement;30import java.sql.ResultSet;31import java.sql.SQLException;32import java.sql.Statement;33import java.util.concurrent.ThreadLocalRandom;34import java.util.stream.Stream;35import static org.hamcrest.MatcherAssert.assertThat;36import static org.junit.Assert.assertEquals;37@Testcontainers38public class DebeziumIT {39 private static final String PG_SCHEMA_NAME = "test";40 private static final String PG_TABLE_NAME = "test";41 private static final String PG_SERVER_NAME = "dbserver1";42 private static final String DEBEZIUM_CONNECTOR_NAME = "debezium_source";43 private static final String QUESTDB_CONNECTOR_NAME = "questdb_sink";44 // we need to locate JARs with QuestDB client and Kafka Connect Connector,45 // this is later used to copy to the Kafka Connect container46 @RegisterExtension47 public static JarResolverExtension connectorJarResolver = JarResolverExtension.forClass(QuestDBSinkTask.class);48 @RegisterExtension49 public static JarResolverExtension questdbJarResolver = JarResolverExtension.forClass(Sender.class);50 private static Network network = Network.newNetwork();51 private static KafkaContainer kafkaContainer = new KafkaContainer()52 .withNetwork(network);53 public static PostgreSQLContainer<?> postgresContainer =54 new PostgreSQLContainer<>(DockerImageName.parse("debezium/postgres:11").asCompatibleSubstituteFor("postgres"))55 .withNetwork(network)56 .withNetworkAliases("postgres");57 public static DebeziumContainer debeziumContainer =58 new DebeziumContainer("debezium/connect:1.9.6.Final")59 .withNetwork(network)60 .withKafka(kafkaContainer)61 .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb-connector.jar")62 .withCopyFileToContainer(MountableFile.forHostPath(questdbJarResolver.getJarPath()), "/kafka/connect/questdb-connector/questdb.jar")63 .dependsOn(kafkaContainer)64 .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("debezium")))65 .withEnv("CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE", "true")66 .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "true");67 @Container68 private static final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3")69 .withNetwork(network)70 .withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)71 .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))72 .withEnv("QDB_CAIRO_COMMIT_LAG", "100")73 .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");74 @BeforeAll75 public static void startContainers() {76 Startables.deepStart(Stream.of(77 kafkaContainer, postgresContainer, debeziumContainer))78 .join();79 }80 @AfterEach81 public void cleanup() throws SQLException {82 debeziumContainer.deleteAllConnectors();83 try (Connection connection = getConnection(postgresContainer);84 Statement statement = connection.createStatement()) {85 statement.execute("drop schema " + PG_SCHEMA_NAME + " CASCADE");86 }87 }88 private static ConnectorConfiguration newQuestSinkBaseConfig(String questTableName) {89 ConnectorConfiguration questSink = ConnectorConfiguration.create()90 .with("connector.class", QuestDBSinkConnector.class.getName())91 .with("host", questDBContainer.getNetworkAliases().get(0))92 .with("tasks.max", "1")93 .with("topics", PG_SERVER_NAME + "."+ PG_SCHEMA_NAME + "." + PG_TABLE_NAME)94 .with(QuestDBSinkConnectorConfig.TABLE_CONFIG, questTableName)95 .with("key.converter", JsonConverter.class.getName())96 .with("value.converter", JsonConverter.class.getName())97 .with("transforms", "unwrap")98 .with("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")99 .with(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");100 return questSink;101 }102 @Test103 public void testSmoke() throws Exception {104 String questTableName = "test_smoke";105 try (Connection connection = getConnection(postgresContainer);106 Statement statement = connection.createStatement()) {107 statement.execute("create schema " + PG_SCHEMA_NAME);108 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");109 statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");110 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");111 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium')");112 startDebeziumConnector();113 ConnectorConfiguration questSinkConfig = newQuestSinkBaseConfig(questTableName);114 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSinkConfig);115 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"116 + "1,\"Learn CDC\"\r\n"117 + "2,\"Learn Debezium\"\r\n",118 "select id, title from " + questTableName);119 }120 }121 @Test122 public void testManyUpdates() throws Exception {123 String questTableName = "test_many_updates";124 try (Connection connection = getConnection(postgresContainer);125 Statement statement = connection.createStatement()) {126 startDebeziumConnector();127 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);128 questSink = questSink.with("transforms.unwrap.add.fields", "source.ts_ms")129 .with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "__source_ts_ms");130 questSink.with(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "symbol");131 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);132 statement.execute("create schema " + PG_SCHEMA_NAME);133 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, symbol varchar(255), price double precision, primary key (id))");134 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (0, 'TDB', 1.0)");135 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'QDB', 1.0)");136 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'IDB', 1.0)");137 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (3, 'PDB', 1.0)");138 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (4, 'KDB', 1.0)");139 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"price\"\r\n"140 + "0,\"TDB\",1.0\r\n"141 + "1,\"QDB\",1.0\r\n"142 + "2,\"IDB\",1.0\r\n"143 + "3,\"PDB\",1.0\r\n"144 + "4,\"KDB\",1.0\r\n",145 "select id, symbol, price from " + questTableName);146 try (PreparedStatement preparedStatement = connection.prepareStatement("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set price = ? where id = ?")) {147 //a bunch of updates148 for (int i = 0; i < 200_000; i++) {149 int id = ThreadLocalRandom.current().nextInt(5);150 double newPrice = ThreadLocalRandom.current().nextDouble(100);151 preparedStatement.setDouble(1, newPrice);152 preparedStatement.setInt(2, id);153 preparedStatement.addBatch();154 }155 preparedStatement.executeBatch();156 // set all prices to a known value, this will be useful in asserting the final state157 for (int i = 0; i < 5; i++) {158 preparedStatement.setDouble(1, 42.0);159 preparedStatement.setInt(2, i);160 Assertions.assertEquals(1, preparedStatement.executeUpdate());161 }162 }163 // all symbols have the last well-known price164 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"symbol\",\"last_price\"\r\n"165 + "0,\"TDB\",42.0\r\n"166 + "1,\"QDB\",42.0\r\n"167 + "2,\"IDB\",42.0\r\n"168 + "3,\"PDB\",42.0\r\n"169 + "4,\"KDB\",42.0\r\n",170 "select id, symbol, last(price) as last_price from " + questTableName);171 // total number of rows is equal to the number of updates and inserts172 QuestDBUtils.assertSqlEventually(questDBContainer, "\"count\"\r\n"173 + "200010\r\n",174 "select count() from " + questTableName);175 }176 }177 @Test178 public void testSchemaChange() throws Exception {179 String questTableName = "test_schema_change";180 try (Connection connection = getConnection(postgresContainer);181 Statement statement = connection.createStatement()) {182 statement.execute("create schema " + PG_SCHEMA_NAME);183 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");184 statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");185 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");186 startDebeziumConnector();187 188 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);189 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);190 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"191 + "1,\"Learn CDC\"\r\n",192 "select id, title from " + questTableName);193 statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " add column description varchar(255)");194 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', 'Best book ever')");195 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"description\"\r\n"196 + "1,\"Learn CDC\",\r\n"197 + "2,\"Learn Debezium\",\"Best book ever\"\r\n",198 "select id, title, description from " + questTableName);199 }200 }201 @Test202 public void testUpdatesChange() throws Exception {203 String questTableName = "test_updates_change";204 try (Connection connection = getConnection(postgresContainer);205 Statement statement = connection.createStatement()) {206 statement.execute("create schema " + PG_SCHEMA_NAME);207 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");208 statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");209 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");210 startDebeziumConnector();211 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);212 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);213 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"214 + "1,\"Learn CDC\"\r\n",215 "select id, title from " + questTableName);216 statement.executeUpdate("update " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " set title = 'Learn Debezium' where id = 1");217 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"218 + "1,\"Learn CDC\"\r\n"219 + "1,\"Learn Debezium\"\r\n",220 "select id, title from " + questTableName);221 }222 }223 @Test224 public void testInsertThenDeleteThenInsertAgain() throws Exception {225 String questTableName = "test_insert_then_delete_then_insert_again";226 try (Connection connection = getConnection(postgresContainer);227 Statement statement = connection.createStatement()) {228 statement.execute("create schema " + PG_SCHEMA_NAME);229 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), primary key (id))");230 statement.execute("alter table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " replica identity full");231 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC')");232 startDebeziumConnector();233 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);234 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);235 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"236 + "1,\"Learn CDC\"\r\n",237 "select id, title from " + questTableName);238 statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1");239 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn Debezium')");240 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\"\r\n"241 + "1,\"Learn CDC\"\r\n"242 + "1,\"Learn Debezium\"\r\n",243 "select id, title from " + questTableName);244 }245 }246 @Test247 public void testEventTime() throws SQLException {248 String questTableName = "test_event_time";249 try (Connection connection = getConnection(postgresContainer);250 Statement statement = connection.createStatement()) {251 statement.execute("create schema " + PG_SCHEMA_NAME);252 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp, primary key (id))");253 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.456Z')");254 startDebeziumConnector();255 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);256 questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");257 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);258 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"259 + "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n",260 "select id, title, timestamp from " + questTableName);261 }262 }263 @Test264 public void testEventTimeMicros() throws SQLException {265 String questTableName = "test_event_time_micros";266 try (Connection connection = getConnection(postgresContainer);267 Statement statement = connection.createStatement()) {268 statement.execute("create schema " + PG_SCHEMA_NAME);269 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp(6), primary key (id))");270 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.123456Z')");271 startDebeziumConnector();272 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);273 questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");274 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);275 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"276 + "1,\"Learn CDC\",\"2021-01-02T01:02:03.123456Z\"\r\n",277 "select id, title, timestamp from " + questTableName);278 }279 }280 @Test281 public void testEventTimeNanos() throws SQLException {282 String questTableName = "test_event_time_nanos";283 try (Connection connection = getConnection(postgresContainer);284 Statement statement = connection.createStatement()) {285 statement.execute("create schema " + PG_SCHEMA_NAME);286 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp(9), primary key (id))");287 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.123456789Z')");288 startDebeziumConnector();289 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);290 questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");291 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);292 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"293 + "1,\"Learn CDC\",\"2021-01-02T01:02:03.123457Z\"\r\n",294 "select id, title, timestamp from " + questTableName);295 }296 }297 @Test298 public void testNonDesignatedTimestamp() throws SQLException {299 String questTableName = "test_non_designated_timestamp";300 try (Connection connection = getConnection(postgresContainer);301 Statement statement = connection.createStatement()) {302 statement.execute("create schema " + PG_SCHEMA_NAME);303 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp, primary key (id))");304 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02T01:02:03.456Z')");305 startDebeziumConnector();306 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);307 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);308 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n"309 + "1,\"Learn CDC\",\"2021-01-02T01:02:03.456000Z\"\r\n",310 "select id, title, created_at from " + questTableName);311 }312 }313 @Test314 public void testDate() throws SQLException {315 String questTableName = "test_date";316 try (Connection connection = getConnection(postgresContainer);317 Statement statement = connection.createStatement()) {318 statement.execute("create schema " + PG_SCHEMA_NAME);319 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at date, primary key (id))");320 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02')");321 startDebeziumConnector();322 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);323 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);324 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"created_at\"\r\n"325 + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n",326 "select id, title, created_at from " + questTableName);327 }328 }329 @Test330 public void testDelete() throws SQLException {331 String questTableName = "test_delete";332 try (Connection connection = getConnection(postgresContainer);333 Statement statement = connection.createStatement()) {334 startDebeziumConnector();335 statement.execute("create schema " + PG_SCHEMA_NAME);336 statement.execute("create table " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " (id int8 not null, title varchar(255), created_at timestamp, primary key (id))");337 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (1, 'Learn CDC', '2021-01-02')");338 ConnectorConfiguration questSink = newQuestSinkBaseConfig(questTableName);339 questSink.with(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "created_at");340 debeziumContainer.registerConnector(QUESTDB_CONNECTOR_NAME, questSink);341 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"342 + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n",343 "select * from " + questTableName);344 // delete should be ignored by QuestDB345 statement.execute("delete from " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " where id = 1");346 statement.execute("insert into " + PG_SCHEMA_NAME + "." + PG_TABLE_NAME + " values (2, 'Learn Debezium', '2021-01-03')");347 QuestDBUtils.assertSqlEventually(questDBContainer, "\"id\",\"title\",\"timestamp\"\r\n"348 + "1,\"Learn CDC\",\"2021-01-02T00:00:00.000000Z\"\r\n"349 + "2,\"Learn Debezium\",\"2021-01-03T00:00:00.000000Z\"\r\n",350 "select * from " + questTableName);351 }352 }353 private static void startDebeziumConnector() {354 ConnectorConfiguration connector = ConnectorConfiguration355 .forJdbcContainer(postgresContainer)356 .with("database.server.name", PG_SERVER_NAME);357 debeziumContainer.registerConnector(DEBEZIUM_CONNECTOR_NAME, connector);358 }359 private static Connection getConnection(360 PostgreSQLContainer<?> postgresContainer)361 throws SQLException {362 return DriverManager.getConnection(postgresContainer.getJdbcUrl(),363 postgresContainer.getUsername(),364 postgresContainer.getPassword());365 }366}...

Full Screen

Full Screen

Source:QuestDBSinkConnectorIT.java Github

copy

Full Screen

1package io.questdb.kafka;2import io.debezium.testing.testcontainers.ConnectorConfiguration;3import io.debezium.testing.testcontainers.DebeziumContainer;4import io.questdb.client.Sender;5import org.apache.kafka.clients.producer.KafkaProducer;6import org.apache.kafka.clients.producer.Producer;7import org.apache.kafka.clients.producer.ProducerConfig;8import org.apache.kafka.clients.producer.ProducerRecord;9import org.apache.kafka.common.serialization.StringSerializer;10import org.junit.jupiter.api.Test;11import org.junit.jupiter.api.extension.RegisterExtension;12import org.slf4j.LoggerFactory;13import org.testcontainers.containers.GenericContainer;14import org.testcontainers.containers.KafkaContainer;15import org.testcontainers.containers.Network;16import org.testcontainers.containers.output.Slf4jLogConsumer;17import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;18import org.testcontainers.junit.jupiter.Container;19import org.testcontainers.junit.jupiter.Testcontainers;20import org.testcontainers.utility.DockerImageName;21import org.testcontainers.utility.MountableFile;22import java.util.Properties;23import static java.time.Duration.ofMinutes;24@Testcontainers25public class QuestDBSinkConnectorIT {26 // we need to locate JARs with QuestDB client and Kafka Connect Connector,27 // this is later used to copy to the Kafka Connect container28 @RegisterExtension29 public static JarResolverExtension connectorJarResolver = JarResolverExtension.forClass(QuestDBSinkTask.class);30 @RegisterExtension31 public static JarResolverExtension questdbJarResolver = JarResolverExtension.forClass(Sender.class);32 private final static Network network = Network.newNetwork();33 @Container34 private static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.0"))35 .withNetwork(network);36 @Container37 private static final GenericContainer<?> questDBContainer = new GenericContainer<>("questdb/questdb:6.5.3")38 .withNetwork(network)39 .withExposedPorts(QuestDBUtils.QUESTDB_HTTP_PORT)40 .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("questdb")))41 .withEnv("QDB_CAIRO_COMMIT_LAG", "100")42 .withEnv("JAVA_OPTS", "-Djava.locale.providers=JRE,SPI");43 @Container44 private static final DebeziumContainer connectContainer = new DebeziumContainer("confluentinc/cp-kafka-connect:7.2.1")45 .withEnv("CONNECT_BOOTSTRAP_SERVERS", kafkaContainer.getNetworkAliases().get(0) + ":9092")46 .withEnv("CONNECT_GROUP_ID", "test")47 .withEnv("CONNECT_OFFSET_STORAGE_TOPIC", "connect-storage-topic")48 .withEnv("CONNECT_CONFIG_STORAGE_TOPIC", "connect-config-topic")49 .withEnv("CONNECT_STATUS_STORAGE_TOPIC", "connect-status-topic")50 .withEnv("CONNECT_KEY_CONVERTER", "org.apache.kafka.connect.storage.StringConverter")51 .withEnv("CONNECT_VALUE_CONVERTER", "org.apache.kafka.connect.json.JsonConverter")52 .withEnv("CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE", "false")53 .withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", "connect")54 .withEnv("CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR", "1")55 .withEnv("CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR", "1")56 .withEnv("CONNECT_STATUS_STORAGE_REPLICATION_FACTOR", "1")57 .withNetwork(network)58 .withExposedPorts(8083)59 .withCopyFileToContainer(MountableFile.forHostPath(connectorJarResolver.getJarPath()), "/usr/share/java/kafka/questdb-connector.jar")60 .withCopyFileToContainer(MountableFile.forHostPath(questdbJarResolver.getJarPath()), "/usr/share/java/kafka/questdb.jar")61 .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("connect")))62 .dependsOn(kafkaContainer, questDBContainer)63 .waitingFor(new HttpWaitStrategy()64 .forPath("/connectors")65 .forStatusCode(200)66 .forPort(8083)67 .withStartupTimeout(ofMinutes(5)));68 @Test69 public void test() throws Exception {70 String topicName = "mytopic";71 Properties props = new Properties();72 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());73 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);74 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);75 try (Producer<String, String> producer = new KafkaProducer<>(props)) {76 producer.send(new ProducerRecord<>(topicName, "foo", "bar")).get();77 }78 ConnectorConfiguration connector = ConnectorConfiguration.create()79 .with("connector.class", QuestDBSinkConnector.class.getName())80 .with("tasks.max", "1")81 .with("key.converter", "org.apache.kafka.connect.storage.StringConverter")82 .with("value.converter", "org.apache.kafka.connect.storage.StringConverter")83 .with("topics", topicName)84 .with("host", questDBContainer.getNetworkAliases().get(0) + ":" + QuestDBUtils.QUESTDB_ILP_PORT);85 connectContainer.registerConnector("my-connector", connector);86 QuestDBUtils.assertSqlEventually(questDBContainer, "\"key\",\"value\"\r\n"87 + "\"foo\",\"bar\"\r\n", "select key, value from " + topicName);88 }89}...

Full Screen

Full Screen

Source:QuestDbContainer.java Github

copy

Full Screen

1package io.questdb.testcontainers;2import org.jetbrains.annotations.NotNull;3import org.testcontainers.containers.JdbcDatabaseContainer;4import org.testcontainers.containers.wait.LogMessageWaitStrategy;5import java.time.Duration;6import java.util.HashSet;7import java.util.Set;8import static java.time.temporal.ChronoUnit.SECONDS;9public class QuestDbContainer <SELF extends QuestDbContainer<SELF>> extends JdbcDatabaseContainer<SELF> {10 private String username = "admin";11 private String password = "quest";12 private static final int POSTGRESQL_PORT = 8812;13 QuestDbContainer(String dockerImageName) {14 super(dockerImageName);15 this.waitStrategy = new LogMessageWaitStrategy()16 .withRegEx(".*server-main started.*")17 .withTimes(1)18 .withStartupTimeout(Duration.of(30, SECONDS));19 }20 @NotNull21 @Override22 protected Set<Integer> getLivenessCheckPorts() {23 return new HashSet<>(getMappedPort(POSTGRESQL_PORT));24 }25 @Override26 protected void configure() {27 addExposedPort(POSTGRESQL_PORT);28 addEnv("POSTGRES_USER", username);29 addEnv("POSTGRES_PASSWORD", password);30 }31 @Override32 public String getDriverClassName() {33 return "org.postgresql.Driver";34 }35 @Override36 public String getJdbcUrl() {37 // Disable Postgres driver use of java.util.logging to reduce noise at startup time38 return "jdbc:postgresql://"+getContainerIpAddress() + ":" + getMappedPort(POSTGRESQL_PORT) + "/";39 }40 @Override41 public SELF withDatabaseName(String dbName) {42 return self();43 }44 @Override45 public String getUsername() {46 return username;47 }48 @Override49 public String getPassword() {50 return password;51 }52 @Override53 public String getTestQueryString() {54 return "select 1 from long_sequence(1)";55 }56 @Override57 public SELF withUsername(final String username) {58 this.username = username;59 return self();60 }61 @Override62 public SELF withPassword(final String password) {63 this.password = password;64 return self();65 }66 @Override67 protected void waitUntilContainerStarted() {68 getWaitStrategy().waitUntilReady(this);69 }70}...

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1package com.questdb.testcontainers;2import java.sql.Connection;3import java.sql.DriverManager;4import java.sql.ResultSet;5import java.sql.SQLException;6import java.sql.Statement;7import org.testcontainers.containers.JdbcDatabaseContainer;8import org.testcontainers.containers.QuestDBContainer;9public class TestContainer {10 public static void main(String[] args) throws SQLException {11 JdbcDatabaseContainer<?> container = new QuestDBContainer<>("latest");12 container.start();13 String jdbcUrl = container.getJdbcUrl();14 String username = container.getUsername();15 String password = container.getPassword();16 Connection connection = DriverManager.getConnection(jdbcUrl, username, password);17 Statement statement = connection.createStatement();18 ResultSet resultSet = statement.executeQuery("SELECT 1");19 resultSet.next();20 int result = resultSet.getInt(1);21 System.out.println("Result: " + result);22 connection.close();23 container.stop();24 }25}

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1package com.questdb.testcontainers;2import java.sql.Connection;3import java.sql.DriverManager;4import java.sql.ResultSet;5import java.sql.SQLException;6import java.sql.Statement;7import org.testcontainers.containers.JdbcDatabaseContainer;8import org.testcontainers.containers.PostgreSQLContainerProvider;9import org.testcontainers.containers.output.Slf4jLogConsumer;10import org.testcontainers.utility.DockerImageName;11public class QuestDBContainer extends JdbcDatabaseContainer<QuestDBContainer> {12 private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("questdb/questdb");13 private static final String DEFAULT_TAG = "latest";14 private static final DockerImageName DEFAULT_IMAGE = DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG);15 private static final String IMAGE = "image";16 private static final String USERNAME = "quest";17 private static final String PASSWORD = "quest";18 public QuestDBContainer() {19 this(DEFAULT_IMAGE);20 }21 public QuestDBContainer(final DockerImageName dockerImageName) {22 super(dockerImageName);23 dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);24 this.withLogConsumer(new Slf4jLogConsumer(logger));25 }26 public String getDriverClassName() {27 return "io.questdb.jdbc.JDBCDriver";28 }29 public String getJdbcUrl() {30 }31 public String getUsername() {32 return USERNAME;33 }34 public String getPassword() {35 return PASSWORD;36 }37 public String getTestQueryString() {38 return "SELECT 1";39 }40 protected void configure() {41 addExposedPort(8812);42 }43 protected void waitUntilContainerStarted() {44 getWaitStrategy().waitUntilReady(this);45 }46 public static void main(String[] args) {47 try (QuestDBContainer questDB = new QuestDBContainer()) {48 questDB.start();49 Connection connection = DriverManager.getConnection(questDB.getJdbcUrl(), questDB.getUsername(), questDB.getPassword());50 Statement statement = connection.createStatement();51 statement.execute("create table abc as (select * from cpu)");52 ResultSet resultSet = statement.executeQuery("select * from abc");53 while (resultSet.next()) {54 System.out.println(resultSet.getString(1

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1package org.testcontainers.containers;2import org.junit.Test;3import org.testcontainers.containers.output.Slf4jLogConsumer;4import java.sql.Connection;5import java.sql.DriverManager;6import java.sql.ResultSet;7import java.sql.Statement;8public class QuestDBContainerTest {9 public void test() throws Exception {10 QuestDBContainer questDBContainer = new QuestDBContainer();11 questDBContainer.withLogConsumer(new Slf4jLogConsumer(QuestDBContainerTest.class).withPrefix("quest-db"));12 questDBContainer.start();13 String jdbcUrl = questDBContainer.getJdbcUrl();14 Connection connection = DriverManager.getConnection(jdbcUrl);15 Statement statement = connection.createStatement();16 statement.executeUpdate("create table tbl (x int, y int)");17 statement.executeUpdate("insert into tbl values (1, 1), (2, 2)");18 ResultSet resultSet = statement.executeQuery("select x, y from tbl");19 while (resultSet.next()) {20 System.out.println("x=" + resultSet.getInt(1) + ", y=" + resultSet.getInt(2));21 }22 resultSet.close();23 statement.close();24 connection.close();25 questDBContainer.stop();26 }27}28package org.testcontainers;29import org.junit.Test;30import org.testcontainers.containers.output.Slf4jLogConsumer;31import java.sql.Connection;32import java.sql.DriverManager;33import java.sql.ResultSet;34import java.sql.Statement;35public class QuestDBContainerTest {36 public void test() throws Exception {37 QuestDBContainer questDBContainer = new QuestDBContainer();38 questDBContainer.withLogConsumer(new Slf4jLogConsumer(QuestDBContainerTest.class).withPrefix("quest-db"));39 questDBContainer.start();40 String jdbcUrl = questDBContainer.getJdbcUrl();41 Connection connection = DriverManager.getConnection(jdbcUrl);42 Statement statement = connection.createStatement();43 statement.executeUpdate("create table tbl (x int, y int)");44 statement.executeUpdate("insert into tbl values (1, 1), (2, 2)");45 ResultSet resultSet = statement.executeQuery("select x, y from tbl");46 while (resultSet.next()) {47 System.out.println("x=" + resultSet.getInt(1) + ", y=" + resultSet.getInt(2));48 }49 resultSet.close();50 statement.close();51 connection.close();52 questDBContainer.stop();

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1import org.testcontainers.containers.QuestDBContainer;2import org.testcontainers.utility.DockerImageName;3public class QuestDBContainerTest {4 public static void main(String[] args) {5 try (QuestDBContainer questDBContainer = new QuestDBContainer(DockerImageName.parse("questdb/questdb:latest"))) {6 questDBContainer.start();7 }8 }9}10import org.testcontainers.containers.QuestDBContainer;11import org.testcontainers.utility.DockerImageName;12public class QuestDBContainerTest {13 public static void main(String[] args) {14 try (QuestDBContainer questDBContainer = new QuestDBContainer(DockerImageName.parse("questdb/questdb:5.0.5"))) {15 questDBContainer.start();16 }17 }18}19import org.testcontainers.containers.QuestDBContainer;20import org.testcontainers.utility.DockerImageName;21public class QuestDBContainerTest {22 public static void main(String[] args) {23 try (QuestDBContainer questDBContainer = new QuestDBContainer(DockerImageName.parse("questdb/questdb:5.0.4"))) {24 questDBContainer.start();25 }26 }27}28import org.testcontainers.containers.QuestDBContainer;29import org.testcontainers.utility.DockerImageName;30public class QuestDBContainerTest {31 public static void main(String[] args) {32 try (QuestDBContainer questDBContainer = new QuestDBContainer(DockerImageName.parse("questdb/questdb:5.0.3"))) {33 questDBContainer.start();34 }35 }36}37import org.testcontainers.containers.QuestDBContainer;38import org.testcontainers.utility.DockerImageName;39public class QuestDBContainerTest {40 public static void main(String[] args) {41 try (QuestDBContainer questDBContainer = new QuestDBContainer(DockerImageName.parse("questdb/questdb:5.0.2"))) {42 questDBContainer.start();43 }44 }45}

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1import org.junit.jupiter.api.Test;2import org.testcontainers.containers.QuestDBContainer;3public class QuestDBTest {4 public void testQuestDB() {5 QuestDBContainer questDBContainer = new QuestDBContainer();6 questDBContainer.start();7 String url = questDBContainer.getJdbcUrl();8 System.out.println("JDBC url : " + url);9 questDBContainer.stop();10 }11}12import org.junit.jupiter.api.Test;13import org.testcontainers.containers.QuestDBContainer;14public class QuestDBTest {15 public void testQuestDB() {16 QuestDBContainer questDBContainer = new QuestDBContainer("questdb/questdb:6.0.3");17 questDBContainer.start();18 String url = questDBContainer.getJdbcUrl();19 System.out.println("JDBC url : " + url);20 questDBContainer.stop();21 }22}23import org.junit.jupiter.api.Test;24import org.testcontainers.containers.QuestDBContainer;25public class QuestDBTest {26 public void testQuestDB() {27 QuestDBContainer questDBContainer = new QuestDBContainer("questdb/questdb:6.0.3")28 .withExposedPorts(9000);29 questDBContainer.start();30 String url = questDBContainer.getJdbcUrl();31 System.out.println("JDBC url : " + url);32 questDBContainer.stop();33 }34}35import org.junit.jupiter.api.Test;36import org.testcontainers.containers.QuestDBContainer;37public class QuestDBTest {38 public void testQuestDB() {39 QuestDBContainer questDBContainer = new QuestDBContainer("questdb/questdb:6.0.3")40 .withExposedPorts(9000)41 .withEnv("QUESTDB_CONF

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1import org.testcontainers.containers.QuestDBContainer;2import java.sql.Connection;3import java.sql.DriverManager;4import java.sql.ResultSet;5import java.sql.Statement;6import java.sql.SQLException;7public class QuestDBContainerTest {8 public static void main(String[] args) throws Exception {9 QuestDBContainer questDBContainer = new QuestDBContainer();10 questDBContainer.start();11 System.out.println("QuestDBContainer started.");12 String jdbcUrl = questDBContainer.getJdbcUrl();13 Connection conn = DriverManager.getConnection(jdbcUrl);14 Statement stmt = conn.createStatement();15 stmt.execute("CREATE TABLE test (x INT, y INT)");16 stmt.execute("INSERT INTO test VALUES (1, 1), (2, 2), (3, 3)");17 ResultSet rs = stmt.executeQuery("SELECT * FROM test");18 while (rs.next()) {19 System.out.println(rs.getInt(1) + ", " + rs.getInt(2));20 }21 stmt.close();22 conn.close();23 questDBContainer.stop();24 }25}26import org.testcontainers.QuestDBContainer;27import java.sql.Connection;28import java.sql.DriverManager;29import java.sql.ResultSet;30import java.sql.Statement;31import java.sql.SQLException;32public class QuestDBContainerTest {33 public static void main(String[] args) throws Exception {34 QuestDBContainer questDBContainer = new QuestDBContainer();35 questDBContainer.start();36 System.out.println("QuestDBContainer started.");37 String jdbcUrl = questDBContainer.getJdbcUrl();38 Connection conn = DriverManager.getConnection(jdbcUrl);39 Statement stmt = conn.createStatement();40 stmt.execute("CREATE TABLE test (x INT, y INT)");41 stmt.execute("INSERT INTO test VALUES (1, 1), (2, 2), (3, 3)");42 ResultSet rs = stmt.executeQuery("SELECT * FROM test");43 while (rs.next()) {44 System.out.println(rs.getInt(1) + ", " + rs.getInt(2));45 }46 stmt.close();47 conn.close();48 questDBContainer.stop();49 }50}

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1package com.questdb.testcontainers;2import org.junit.jupiter.api.Test;3import org.testcontainers.containers.QuestDBContainer;4import java.sql.Connection;5import java.sql.Date;6import java.sql.DriverManager;7import java.sql.PreparedStatement;8import java.sql.ResultSet;9import java.sql.SQLException;10import java.sql.Statement;11import java.time.Instant;12import static org.junit.jupiter.api.Assertions.assertEquals;13import static org.junit.jupiter.api.Assertions.assertTrue;14public class QuestDBContainerTest {15 public void test() throws SQLException {16 try (QuestDBContainer questDB = new QuestDBContainer()) {17 questDB.start();18 Connection connection = DriverManager.getConnection(questDB.getJdbcUrl());19 Statement statement = connection.createStatement();20 statement.execute("create table test as (select * from (values('a', 1), ('b', 2)) as x(name, value))");21 statement.close();22 PreparedStatement preparedStatement = connection.prepareStatement("select * from test");23 ResultSet resultSet = preparedStatement.executeQuery();24 assertTrue(resultSet.next());25 assertEquals("a", resultSet.getString("name"));26 assertEquals(1, resultSet.getInt("value"));27 assertTrue(resultSet.next());28 assertEquals("b", resultSet.getString("name"));29 assertEquals(2, resultSet.getInt("value"));30 assertTrue(!resultSet.next());31 resultSet.close();32 preparedStatement.close();33 connection.close();34 }35 }36 public void testWithCustomConfiguration() throws SQLException {37 try (QuestDBContainer questDB = new QuestDBContainer()38 .withConfiguration("http.port", "9001")39 .withConfiguration("cairo.sql.default-bind-variable-page-size", "4096")) {40 questDB.start();41 assertEquals("9001", questDB.getConfiguration("http.port"));42 assertEquals("4096", questDB.getConfiguration("cairo.sql.default-bind-variable-page-size"));43 Connection connection = DriverManager.getConnection(questDB.getJdbcUrl());44 Statement statement = connection.createStatement();45 statement.execute("create table test as (select * from (values('a', 1), ('b', 2)) as x(name, value))");46 statement.close();47 PreparedStatement preparedStatement = connection.prepareStatement("select * from test");48 ResultSet resultSet = preparedStatement.executeQuery();49 assertTrue(resultSet.next());50 assertEquals("a", resultSet.getString("name"));51 assertEquals(1, resultSet.getInt("value"));52 assertTrue(resultSet.next());53 assertEquals("b", resultSet.getString("name"));54 assertEquals(2, resultSet.getInt("value"));

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1import org.testcontainers.containers.QuestDBContainer;2public class 1 {3 public static void main(String[] args) {4 QuestDBContainer questDBContainer = new QuestDBContainer()5 .withConfiguration("http.enabled=true")6 .withConfiguration("http.bindAddress=

Full Screen

Full Screen

QuestDBContainer

Using AI Code Generation

copy

Full Screen

1import org.testcontainers.containers.QuestDBContainer;2import java.sql.Connection;3import java.sql.DriverManager;4import java.sql.ResultSet;5import java.sql.Statement;6import java.util.Arrays;7public class 1 {8 public static void main(String[] args) throws Exception {9 QuestDBContainer questDBContainer = new QuestDBContainer();10 questDBContainer.start();11 Connection connection = DriverManager.getConnection(questDBContainer.getJdbcUrl());12 Statement statement = connection.createStatement();13 statement.execute("create table test as (select rnd_symbol('A', 'B', 'C', 'D', 'E') as sym, rnd_int(100, 200, 300) as value from long_sequence(100000))");14 ResultSet resultSet = statement.executeQuery("select sym, avg(value) from test group by sym");15 while (resultSet.next()) {16 System.out.println("sym=" + resultSet.getString("sym") + ", avg(value)=" + resultSet.getDouble("avg"));17 }18 connection.close();19 questDBContainer.stop();20 }21}22sym=A, avg(value)=200.00323sym=B, avg(value)=200.00724sym=C, avg(value)=199.99725sym=D, avg(value)=200.00326sym=E, avg(value)=199.996

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Testcontainers-java automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Test Your Web Or Mobile Apps On 3000+ Browsers

Signup for free

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful