How to use epoch_timestamp method in localstack

Best Python code snippet using localstack_python

client.py

Source:client.py Github

copy

Full Screen

1import contextlib2from typing import Any, Dict3import paho.mqtt.client as mqtt4from influx_line_protocol import Metric5import datetime6class Client:7 def __init__(self, broker: str, port: int, client_id: str = "Random"):8 """Create an instance of Client9 Args:10 broker (str):Address of the broker you are using11 port (int): port number you want to connect on12 client_id (str): Client id 13 """14 self.client = mqtt.Client(client_id)15 self._connect(broker=broker, port=port)16 def _connect(self, broker="localhost", port=1883):17 """Private method to connect to the broker and start loop for sending data18 Args:19 broker (str, optional): the broker id. Defaults to "localhost".20 port (int, optional): port number. Defaults to 1883.21 """22 self.client.connect(host=broker, port=port)23 self.client.loop_start()24 def _fix_dest_table(self, dest_table: str, tags) -> str:25 """Private method to extract the dest_table from the tags if it is not defined26 Args:27 dest_table (str): original dest_table sent by user in make_send or _make_data.28 tags (_type_): tags sent by user in make_send or _make_data.29 Returns:30 (str): _description_31 """32 if dest_table is None:33 dest_table = f"{list(tags.values())[0]}"34 return dest_table35 def _fix_topic(self, topic: str, tags: Dict[str, Any]) -> str:36 """Private method to extract the topic from the tags and append it to original topic37 Args:38 topic (str): topic sent by user in make_send or _make_data.39 tags (Dict[str, Any]): tags sent by user in make_send or _make_data.40 Returns:41 (str): Fixed topic42 """43 for value in tags.values():44 topic = f"{topic}/{value}"45 return f"{topic}/"46 def _fix_timestamp(self, epoch_timestamp) -> float:47 """48 Private method to fix the timestamp49 and standardize it to the format of influx_line_protocol.50 Args:51 epoch_timestamp (float|str|Datetime): Timestamp you would like to define defaults to now.52 Returns:53 (float): Timestamp in the format of influx_line_protocol.54 """55 if isinstance(epoch_timestamp, datetime.datetime):56 epoch_timestamp = epoch_timestamp.timestamp()57 if type(epoch_timestamp) is float:58 epoch_timestamp = epoch_timestamp * 10**959 else:60 with contextlib.suppress(Exception):61 epoch_timestamp = (62 datetime.datetime.strptime(63 epoch_timestamp, "%Y-%m-%d %H:%M:%S.%f"64 ).timestamp()65 * 10**966 )67 return epoch_timestamp68 try:69 epoch_timestamp = (70 datetime.datetime.strptime(71 epoch_timestamp, "%Y-%m-%d %H:%M:%S"72 ).timestamp()73 * 10**974 )75 return epoch_timestamp76 except Exception:77 print(78 f"{epoch_timestamp} is not a valid timestamp \nThe correct format is YYYY-MM-DD HH:MM:SS.SSS\nPlease try this or use a datetime object"79 )80 exit()81 return epoch_timestamp82 def make_send(self,83 topic: str,84 tags: Dict[str, Any],85 values: Dict[str, Any],86 epoch_timestamp: float,87 dest_table: str,88 ):89 """90 Use this method to make data and encode it in influx_line_protocol91 data is stored in metric and send it to the broker.92 Args:93 topic (str): Add the topic of the message94 tags (Dict[str, Any]): Tags you would like to send. 95 values (Dict[str, Any]): values you would like to add96 epoch_timestamp (float): Timestamp you would like to define defaults to now.97 dest_table (str | None, optional): Dest_table you would like to add. Defaults to None.98 """99 self._make_data(100 topic=topic,101 tags=tags,102 values=values,103 epoch_timestamp=epoch_timestamp,104 dest_table=dest_table,105 )106 self._send_data()107 def _make_data(108 self,109 topic: str,110 tags: Dict[str, Any],111 values: Dict[str, Any],112 epoch_timestamp: float,113 dest_table: str,114 ):115 """116 Private method to make data and encode it in influx_line_protocol117 data is stored in metric.118 Args:119 topic (str): Add the topic of the message120 tags (Dict[str, Any]): Tags you would like to send. 121 values (Dict[str, Any]): values you would like to add122 epoch_timestamp (float): Timestamp you would like to define defaults to now.123 dest_table (str | None, optional): Dest_table you would like to add. Defaults to None.124 """125 dest_table = self._fix_dest_table(dest_table, tags)126 self.topic = self._fix_topic(topic, tags)127 self.metric = Metric(dest_table)128 self.metric.with_timestamp(self._fix_timestamp(epoch_timestamp))129 for (k, v) in values.items():130 self.metric.add_value(k, v)131 for (k, v) in tags.items():132 self.metric.add_tag(k, v)133 def _send_data(self):134 """Method to sent the made data made after using_make_data 135 """136 self.client.publish(topic=self.topic, payload=f"{self.metric}")137 def close(self):138 """139 Method to close the connection to the broker140 and stop the loop141 """142 self.client.loop_stop()...

Full Screen

Full Screen

timeconv.py

Source:timeconv.py Github

copy

Full Screen

1#!/usr/bin/python2# _*_ coding: utf-8 _*_3from datetime import datetime4import time5import pytz6class TimeConv:7 def __init__(self):8 pass9 def epoch2jst(self, epoch_timestamp):10 """11 Convert epoch timestamp to JST timestamp string12 """13 _dt = datetime.fromtimestamp(float(epoch_timestamp))14 _jst = pytz.timezone('Asia/Tokyo').localize(_dt)15 return "{0:%Y-%m-%d %H:%M:%S}".format(_jst)16 def epoch2utc(self, epoch_timestamp):17 """18 Convert epoch timestamp to UTC timestamp string19 """20 _dt = datetime.utcfromtimestamp(float(epoch_timestamp))21 return "{0:%Y-%m-%d %H:%M:%S}".format(_dt)22 def epoch2jstwithmilisec(self, epoch_timestamp):23 """24 Convert epoch timestamp to JST timestamp with miliseconds25 """26 _dt = datetime.fromtimestamp(float(epoch_timestamp))27 _jst = pytz.timezone('Asia/Tokyo').localize(_dt)28 return "{0:%Y-%m-%d %H:%M:%S.}".format(_jst) + "%06d" % (_jst.microsecond // 1000)29 def epoch2utcwithmilisec(self, epoch_timestamp):30 """31 Convert epoch timestamp to UTC timestamp with miliseconds32 """33 _dt = datetime.utcfromtimestamp(float(epoch_timestamp))34 return "{0:%Y-%m-%d %H:%M:%S.}".format(_dt) + "%06d" % (_dt.microsecond // 1000)35 def str2epoch(self, time_str):36 """37 Convert JST string (YYYY-MM-DD HH:mm:ss) to epoch timestamp (int)38 """39 return int(time.mktime(time.strptime(time_str, "%Y-%m-%d %H:%M:%S")))40if __name__=='__main__':41 epoch_timestamp = "1398416400"42 print "timestamp:\t%s" % epoch_timestamp43 tc = TimeConv()44 print "JST:\t%s" % tc.epoch2jst(epoch_timestamp)45 print "UTC:\t%s" % tc.epoch2utc(epoch_timestamp)46 print "JSTwithMili\t:%s" % tc.epoch2jstwithmilisec(epoch_timestamp)47 print "UTCwithMili\t:%s" % tc.epoch2utcwithmilisec(epoch_timestamp)48 print "*" * 6049 time_str = "2014-04-26 22:00:00"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful