How to use get_raw_metrics method in localstack

Best Python code snippet using localstack_python

test_db_accessor.py

Source:test_db_accessor.py Github

copy

Full Screen

...60 session.commit()61@pytest.mark.usefixtures("_cleanup")62def test_add_query_metrics(session: Session):63 # Given64 (_query_id, _raw_metrics) = get_raw_metrics()65 # When66 _add_query_metrics(_raw_metrics)67 # Then68 result = session.query(QueryMetrics).filter_by(queryId=_query_id).first()69 assert result is not None70 assert result.queryId == _query_id71 assert result.transactionId == "3e7820f0-4a9e-498f-88d8-c206ac85bd3e"72 assert result.query == "SELECT * FROM table;"73 assert result.uri == "http://localhost:8080/v1/query/20210720_093342_00012_pzafy"74 assert "Fragment 0" in result.plan75 assert "└─" in result.plan # Ensure unicode characters are persisted correctly76 assert result.payload["stageId"] == "20220505_100804_00003_7dpwd.0"77 assert result.queryType == "SELECT"78 assert result.remoteClientAddress == "127.0.0.1"79 assert result.user == "test-user"80 assert result.userAgent == "StatementClientV1/8012395-dirty"81 assert result.source == "datalake-cli"82 assert result.serverAddress == "localhost"83 assert result.serverVersion == "379"84 assert result.environment == "test"85 assert result.sessionProperties == {"catalog": "postgresql"}86 assert result.cpuTime == 25.97587 assert result.failedCpuTime == 088 assert result.wallTime == 112.44489 assert result.queuedTime == 0.00190 assert result.scheduledTime == 156.69691 assert result.failedScheduledTime == 192 assert result.analysisTime == 12.17893 assert result.planningTime == 1.73394 assert result.executionTime == 100.26695 assert result.inputBlockedTime == 7.7296 assert result.failedInputBlockedTime == 0.197 assert result.outputBlockedTime == 0.298 assert result.failedOutputBlockedTime == 0.399 assert result.peakUserMemoryBytes == 24610191100 assert result.peakTaskUserMemory == 19472880101 assert result.peakTaskTotalMemory == 43828971102 assert result.physicalInputBytes == 0103 assert result.physicalInputRows == 1293616104 assert result.internalNetworkBytes == 49808906105 assert result.internalNetworkRows == 1123864106 assert result.totalBytes == 0107 assert result.totalRows == 1293616108 assert result.outputBytes == 9019074109 assert result.outputRows == 160908110 assert result.writtenBytes == 0111 assert result.writtenRows == 0112 assert result.processedInputRows == 1113 assert result.processedInputBytes == 571114 assert result.cumulativeMemory == 2143774060571.0115 assert result.completedSplits == 507116 assert result.resourceWaitingTime == 12.178117 assert result.planNodeStatsAndCosts == {"stats": {}, "costs": {}}118 assert result.createTime == datetime.fromtimestamp(1626773622)119 assert result.executionStartTime == datetime.fromtimestamp(1626773634)120 assert result.endTime == datetime.fromtimestamp(1626773734)121@pytest.mark.usefixtures("_cleanup")122def test_add_older_version_query_metrics(session: Session):123 # Given124 (_query_id, _raw_metrics) = get_raw_metrics()125 del _raw_metrics["statistics"]["processedInputBytes"]126 del _raw_metrics["statistics"]["processedInputRows"]127 _raw_metrics["statistics"]["someOldNotUsedValue"] = 1337128 # When129 _add_query_metrics(_raw_metrics)130 # Then131 result = session.query(QueryMetrics).filter_by(queryId=_query_id).first()132 assert result is not None133 assert result.queryId == _query_id134@pytest.mark.usefixtures("_cleanup")135def test_add_query_metrics_missing_json_stringify_fields(session: Session):136 # Given137 (_query_id, _raw_metrics) = get_raw_metrics()138 del _raw_metrics["metadata"]["payload"]139 del _raw_metrics["statistics"]["planNodeStatsAndCosts"]140 # When141 _add_query_metrics(_raw_metrics)142 # Then143 result = session.query(QueryMetrics).filter_by(queryId=_query_id).first()144 assert result is not None145 assert result.queryId == _query_id146@pytest.mark.usefixtures("_cleanup")147def test_add_column_metrics(session: Session):148 # Given149 (_query_id, _raw_metrics) = get_raw_metrics()150 # When151 _add_query_metrics(_raw_metrics) # Needed for FK152 _add_column_metrics(_raw_metrics)153 # Then154 result = (155 session.query(ColumnMetrics)156 .order_by(ColumnMetrics.catalogName.asc())157 .order_by(ColumnMetrics.schemaName.asc())158 .order_by(ColumnMetrics.tableName.asc())159 .order_by(ColumnMetrics.columnName.asc())160 .filter_by(queryId=_query_id)161 .all()162 )163 assert len(result) == 3164 assert result[0].queryId == _query_id165 assert result[0].catalogName == "postgresql"166 assert result[0].schemaName == "test-schema"167 assert result[0].tableName == "test-table-1"168 assert result[0].columnName == "test-column-1_1"169 assert result[0].physicalInputBytes == 0170 assert result[0].physicalInputRows == 1135200171 assert result[1].queryId == _query_id172 assert result[1].catalogName == "postgresql"173 assert result[1].schemaName == "test-schema"174 assert result[1].tableName == "test-table-1"175 assert result[1].columnName == "test-column-1_2"176 assert result[1].physicalInputBytes == 0177 assert result[1].physicalInputRows == 1135200178 assert result[2].queryId == _query_id179 assert result[2].catalogName == "postgresql"180 assert result[2].schemaName == "test-schema"181 assert result[2].tableName == "test-table-2"182 assert result[2].columnName == "test-column-2_1"183 assert result[2].physicalInputBytes == 0184 assert result[2].physicalInputRows == 2185@pytest.mark.usefixtures("_cleanup")186def test_str_or_float_timestamps(session: Session):187 # Given188 (_query_id, _raw_metrics) = get_raw_metrics()189 _raw_metrics["createTime"] = "2021-07-20T9:33:42.000Z"190 _raw_metrics["endTime"] = 1626773734.0191 # When192 _add_query_metrics(_raw_metrics)193 # Then194 result = session.query(QueryMetrics).filter_by(queryId=_query_id).first()195 assert result is not None196 assert result.queryId == _query_id197 assert result.transactionId == "3e7820f0-4a9e-498f-88d8-c206ac85bd3e"198 assert result.query == "SELECT * FROM table;"199 assert result.queryType == "SELECT"200 assert result.remoteClientAddress == "127.0.0.1"201 assert result.user == "test-user"202 assert result.userAgent == "StatementClientV1/8012395-dirty"203 assert result.source == "datalake-cli"204 assert result.serverAddress == "localhost"205 assert result.serverVersion == "379"206 assert result.environment == "test"207 assert result.cpuTime == 25.975208 assert result.wallTime == 112.444209 assert result.queuedTime == 0.001210 assert result.scheduledTime == 156.696211 assert result.analysisTime == 12.178212 assert result.planningTime == 1.733213 assert result.executionTime == 100.266214 assert result.peakUserMemoryBytes == 24610191215 assert result.peakTaskUserMemory == 19472880216 assert result.peakTaskTotalMemory == 43828971217 assert result.physicalInputBytes == 0218 assert result.physicalInputRows == 1293616219 assert result.internalNetworkBytes == 49808906220 assert result.internalNetworkRows == 1123864221 assert result.totalBytes == 0222 assert result.totalRows == 1293616223 assert result.outputBytes == 9019074224 assert result.outputRows == 160908225 assert result.writtenBytes == 0226 assert result.writtenRows == 0227 assert result.cumulativeMemory == 2143774060571.0228 assert result.completedSplits == 507229 assert result.resourceWaitingTime == 12.178230 assert result.createTime == datetime.fromtimestamp(1626773622)231 assert result.executionStartTime == datetime.fromtimestamp(1626773634)232 assert result.endTime == datetime.fromtimestamp(1626773734)233@pytest.mark.usefixtures("_cleanup")234def test_duplicate_columns(session: Session):235 # Given236 (_query_id, _raw_metrics) = get_raw_metrics()237 # Duplicate first input table238 _raw_metrics["ioMetadata"]["inputs"].append(_raw_metrics["ioMetadata"]["inputs"][0])239 # When240 _add_query_metrics(_raw_metrics)241 _add_column_metrics(_raw_metrics)242 # Then243 result = (244 session.query(ColumnMetrics)245 .order_by(ColumnMetrics.catalogName.asc())246 .order_by(ColumnMetrics.schemaName.asc())247 .order_by(ColumnMetrics.tableName.asc())248 .order_by(ColumnMetrics.columnName.asc())249 .filter_by(queryId=_query_id)250 .all()251 )252 assert len(result) == 3253 assert result[0].queryId == _query_id254 assert result[0].catalogName == "postgresql"255 assert result[0].schemaName == "test-schema"256 assert result[0].tableName == "test-table-1"257 assert result[0].columnName == "test-column-1_1"258 assert result[0].physicalInputBytes == 0259 assert result[0].physicalInputRows == 1135200260 assert result[1].queryId == _query_id261 assert result[1].catalogName == "postgresql"262 assert result[1].schemaName == "test-schema"263 assert result[1].tableName == "test-table-1"264 assert result[1].columnName == "test-column-1_2"265 assert result[1].physicalInputBytes == 0266 assert result[1].physicalInputRows == 1135200267 assert result[2].queryId == _query_id268 assert result[2].catalogName == "postgresql"269 assert result[2].schemaName == "test-schema"270 assert result[2].tableName == "test-table-2"271 assert result[2].columnName == "test-column-2_1"272 assert result[2].physicalInputBytes == 0273 assert result[2].physicalInputRows == 2274@pytest.mark.usefixtures("_cleanup")275def test_add_client_tags(session: Session):276 # Given277 (_query_id, _raw_metrics) = get_raw_metrics()278 # When279 _add_query_metrics(_raw_metrics) # Needed for FK280 _add_client_tags(_raw_metrics)281 # Then282 result = session.query(ClientTags).order_by(ClientTags.clientTag.asc()).filter_by(queryId=_query_id).all()283 assert len(result) == 2284 client_tag_1, client_tag_2 = result285 assert client_tag_1.queryId == _query_id286 assert client_tag_1.clientTag == "load_balancer"287 assert client_tag_2.queryId == _query_id288 assert client_tag_2.clientTag == "superset"289@pytest.mark.usefixtures("_cleanup")290def test_add_resource_groups(session: Session):291 # Given292 (_query_id, _raw_metrics) = get_raw_metrics()293 # When294 _add_query_metrics(_raw_metrics) # Needed for FK295 _add_resource_groups(_raw_metrics)296 # Then297 result = (298 session.query(ResourceGroups).order_by(ResourceGroups.resourceGroup.asc()).filter_by(queryId=_query_id).all()299 )300 assert len(result) == 2301 resource_group_1, resource_group_2 = result302 assert resource_group_1.queryId == _query_id303 assert resource_group_1.resourceGroup == "admin"304 assert resource_group_2.queryId == _query_id305 assert resource_group_2.resourceGroup == "global"306@pytest.mark.usefixtures("_cleanup")307def test_add_operator_summaries(session: Session):308 # Given309 (_query_id, _raw_metrics) = get_raw_metrics()310 # When311 _add_query_metrics(_raw_metrics) # Needed for FK312 _add_operator_summaries(_raw_metrics)313 # Then314 result = (315 session.query(OperatorSummaries)316 .order_by(OperatorSummaries.operatorSummary["stageId"].as_integer().asc())317 .order_by(OperatorSummaries.operatorSummary["pipelineId"].as_integer().asc())318 .order_by(OperatorSummaries.operatorSummary["operatorId"].as_integer().asc())319 .order_by(OperatorSummaries.operatorSummary["planNodeId"].as_integer().asc())320 .filter_by(queryId=_query_id)321 .all()322 )323 assert len(result) == 4...

Full Screen

Full Screen

info_handler.py

Source:info_handler.py Github

copy

Full Screen

...110 """ Return a list of metrics objects """111 metrics = metrics_calculator.MetricsCalculator(self.processor)112 metric_list = []113 # Populate the list114 for key in metrics.get_raw_metrics().keys():115 name = metrics.get_raw_metrics()[key]["NAME"]116 formula = metrics.get_raw_metrics()[key]["FORMULA"]117 description = metrics.get_raw_metrics()[key]["DESCRIPTION"]118 metric = Metric(name, formula, description)119 metric_list.append(metric)120 return metric_list121 def __get_object_from_metric_list(self, occurrence_metric):122 """ Load the metric object variable values in obj """123 obj = [x for x in self.metrics_list if x.name == occurrence_metric]124 return obj125 def get_metric_name(self, occurrence_metric):126 """Return metric name"""127 try:128 metric = self.__get_object_from_metric_list(occurrence_metric)[0]129 return metric.get_name()130 except IndexError:131 return None...

Full Screen

Full Screen

test.py

Source:test.py Github

copy

Full Screen

...49 assert re.search('Cannot connect to host', results[2]['error_message'])50def test_process_metrics():51 """Test for process_metrics function"""52 websites_list = get_websites_list(os.getenv('WEBSITES_LIST_FILE'))53 raw_results = get_raw_metrics(websites_list)54 results = process_metrics(raw_results, websites_list)55 assert isinstance(results, list)56 assert isinstance(results[0], dict)57 for index in [0, 1]:58 assert 'pattern_match' in results[index]['metrics']59 assert results[index]['metrics']['pattern_match'] == (index == 0)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful