How to use list_messages method in tempest

Best Python code snippet using tempest_python

application.py

Source:application.py Github

copy

Full Screen

1import os, datetime2from flask import Flask, render_template, request, jsonify3from flask_socketio import SocketIO, emit, join_room, leave_room4app = Flask(__name__)5# Check for environment variable6if not os.getenv("SECRET_KEY"):7 raise RuntimeError("SECRET_KEY is not set")8app.config["SECRET_KEY"] = os.getenv("SECRET_KEY")9socketio = SocketIO(app)10# stored_channels = []11stored_channels = [{'room':'Welcome!', 'messages': [{'name': 'user1', 'text': 'Hello world!', 'time': '06-07-20 12:12:12'}]}]12@app.route("/")13def index():14 return render_template("index.html")15@app.route("/channels", methods=["GET"])16def channels():17 # Generate list of channels18 list_channels = []19 for c in stored_channels:20 list_channels.append(c["room"])21 return jsonify(list_channels)22@socketio.on("create channel")23def channels(data):24 list_channels = [] # stored_channels to be appended25 # channel_id = 026 channel = data["channel"].strip() # a new channel sent by a user27 # Create a list of existing channel.28 for c in stored_channels:29 list_channels.append(c["room"])30 # Add a new channel to stored_channels if it doesn't alredy exist.31 if channel not in list_channels:32 # Add one to the id of the last element in stored_channels33 new_channel = {'room': channel, 'messages': []}34 stored_channels.append(new_channel)35 emit("announce channel", {"name_new_channel": channel}, broadcast=True)36 else:37 emit("alert", {"message": "This channel already exists. Please choose different name."}, broadcast=False)38@socketio.on("add message")39def messages(data):40 max_messages = 10041 channel = data['channel']42 name = data['name']43 text = data['message']44 # timestamp45 time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")46 new_message = {'name': name, 'text': text, 'time': time}47 list_channels = []48 for c in stored_channels:49 list_channels.append(c['room'])50 index = list_channels.index(channel)51 # If the number of messages are more than max_messages, remove the oldest one before putting the new one52 list_messages = stored_channels[index]['messages']53 if len(list_messages) >= max_messages:54 list_messages.pop(0)55 list_messages.append(new_message)56 emit ('announce message', {'text': text, 'name': name, 'time': time}, room=channel)57@socketio.on('remove message')58def remove(data):59 channel = data['channel']60 name = data['name']61 time = data['timestamp']62 # find the message in the stored_channels and remove it63 list_channels = []64 for c in stored_channels:65 list_channels.append(c['room'])66 index = list_channels.index(channel)67 list_messages = stored_channels[index]['messages']68 for message in list_messages:69 if message['name'] == name and message['time'] == time:70 index_m = list_messages.index(message)71 # Replace the text72 list_messages[index_m]['text'] = "**This message has been removed**"73 # this removes the message74 # list_messages.pop(index_m)75 emit('announce removed message',{'text': 'a message is removed'}, room=channel)76@socketio.on('join channel')77def on_join(data):78 username = data['username']79 room = data['channel']80 join_room(room)81 emit('announce join',{'text': username + ' has joined the room.'}, room=room)82@socketio.on('leave channel')83def on_leave(data):84 username = data['username']85 room = data['channel']86 leave_room(room)87 emit('announce leave',{'text': username + ' has left the room.'}, room=room)88@app.route("/messages", methods=["POST"])89def messages():90 channel = request.form.get('channel')91 # Find the index of the matching channel92 list_channels = []93 for c in stored_channels:94 list_channels.append(c['room'])95 index = list_channels.index(channel)96 # Extract messages for this channel97 messages = stored_channels[index]['messages']98 return jsonify(messages)99if __name__ == '__main__':...

Full Screen

Full Screen

quickstart.py

Source:quickstart.py Github

copy

Full Screen

1from __future__ import print_function2import pickle3import os.path4from googleapiclient.discovery import build5from google_auth_oauthlib.flow import InstalledAppFlow6from google.auth.transport.requests import Request7# If modifying these scopes, delete the file token.pickle.8SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']9def main():10 """Shows basic usage of the Gmail API.11 Lists the user's Gmail labels.12 """13 creds = None14 # The file token.pickle stores the user's access and refresh tokens, and is15 # created automatically when the authorization flow completes for the first16 # time.17 if os.path.exists('token.pickle'):18 with open('token.pickle', 'rb') as token:19 creds = pickle.load(token)20 # If there are no (valid) credentials available, let the user log in.21 if not creds or not creds.valid:22 if creds and creds.expired and creds.refresh_token:23 creds.refresh(Request())24 else:25 flow = InstalledAppFlow.from_client_secrets_file(26 'credentials.json', SCOPES)27 creds = flow.run_local_server(port=0)28 # Save the credentials for the next run29 with open('token.pickle', 'wb') as token:30 pickle.dump(creds, token)31 service = build('gmail', 'v1', credentials=creds)32 # Call the Gmail API33 results = service.users().messages().list(userId="me").execute()34 list_ids = results['messages']35 # print(list_ids[1]['id'])36 37 list_messages=[]38 for i in range(len(list_ids)):39 temp = service.users().messages().get(userId="me", id=list_ids[i]['id']).execute()40 list_messages.append(temp['payload']['headers'])41 print(list_messages[0][0]['name'])42 list_from=[]43 for j in range(len(list_messages)):44 for k in range(len(list_messages[j])):45 if(list_messages[j][k]['name']=="From"):46 list_from.append(list_messages[j][k]['value'])47 48 print(list_from)49 50 51if __name__ == '__main__':...

Full Screen

Full Screen

create_graph.py

Source:create_graph.py Github

copy

Full Screen

1from django.core.management.base import BaseCommand2import sys3from datetime import *4# sys.path.append("../../grafo")5from chat.grafo import Grafo6from chat.models import *7import pickle8class Command(BaseCommand):9 help = 'Create graph'10 def handle(self, *args, **kwargs):11 grafo = Grafo()12 list_messages = list(Message.objects.all().order_by("timestamp"))13 for i, message in enumerate(list_messages):14 if i != len(list_messages) - 1:15 print(datetime.strptime(list_messages[i].timestamp.isoformat(), '%H:%M:%S.%f')- datetime.strptime(list_messages[i+1].timestamp.isoformat(), '%H:%M:%S.%f'))16 if datetime.strptime(list_messages[i].timestamp.isoformat(), '%H:%M:%S.%f')- datetime.strptime(list_messages[i+1].timestamp.isoformat(), '%H:%M:%S.%f') < timedelta(seconds=30):17 grafo.update_edge(list_messages[i].top_scoring_intent, list_messages[i+1].top_scoring_intent)18 else:19 print("nisba")20 for el in grafo.adj:21 print(el)22 print(grafo.adj[el])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful