How to use stop_services method in localstack

Best Python code snippet using localstack_python

CombineData.py

Source:CombineData.py Github

copy

Full Screen

1# 本脚本是进行数据提取与结构化的核心部分2from AccessData import read_json_to_ram, write_list_to_csv, write_ram_to_json, read_transport_node_bus_csv3import time456def calculate_routes_per_stop(): # NaN -> .json & dict{list} | 计算每个stop具体有哪些route,返回字典的键是stop code,值是该车站承载的服务list7 routes = read_json_to_ram('routes')8 stop_services = {} # 用来存放每个stop承载量9 for i in routes: # 这里的i已经是一个完整的service字典了10 key_list = stop_services.keys() # 存储stop承载量字典的所有stop值11 this_bus_stop = i['BusStopCode']1213 if this_bus_stop in key_list:14 stop_services[f'{this_bus_stop}'].append(i['ServiceNo'])15 elif this_bus_stop not in key_list:16 stop_services[f'{this_bus_stop}'] = [i['ServiceNo']]17 write_ram_to_json(stop_services, 'raw', 'stops_ServicesNo')18 return stop_services192021def assign_stop_capacity(stop_services): # dict{list} -> .json & .csv | 接下来把上面算出的每个站点对于线路数的承载量数据链接回stops22 stops = read_json_to_ram('stops')23 for i in stops:24 this_bus_stop = i['BusStopCode'] # 拿出当前访问到的dict的stop code25 i['Capacity'] = len(stop_services[f'{this_bus_stop}'])26 write_ram_to_json(stops, 'display_dynamic', 'stops_capacity')27 write_list_to_csv(stops, 'stops_capacity')282930def get_single_service_volume(service_code): # string -> float | 根据service code获取这个单一service在当前的每小时车数31 services = read_json_to_ram('services')32 time_now = time.strftime("%H, %M", time.localtime()).split(",") # 计算现在是什么发车状态33 minute = int(time_now[0]) * 60 + int(time_now[1])34 if 390 <= minute <= 510: # 0630 - 083035 freq = 'AM_Peak_Freq'36 elif 510 < minute < 1020: # 0831 - 165937 freq = 'AM_Offpeak_Freq'38 elif 1020 <= minute <= 1140: # 1700 = 190039 freq = 'PM_Peak_Freq'40 else: # else41 freq = 'PM_Offpeak_Freq'4243 result = 0 # 如果没有这辆车的service信息,说明这期间它发车数是044 for i in services:45 if i.get('ServiceNo') == service_code:46 freq_str = i.get(freq)47 if freq_str == '-' or freq_str == '00-00' or freq_str == '0-0':48 result = 049 else:50 freq_str = freq_str.split('-')51 if len(freq_str) == 1: # 如果这个间隔不是范围而是确定值52 result = float(freq_str[0])53 else:54 result = round(60/((int(freq_str[1]) + int(freq_str[0]))/2), 2) # 先计算这一时期平均发车间隔,再计算1h内车数,然后取两位小数55 break56 else:57 continue58 # print(result)59 return result606162def get_service_volume(): # NaN -> .json & dict | 查询每个service的volume,输出一个字典,减少调用计算realtime volume的次数63 stop_services = read_json_to_ram('stops_ServicesNo')64 stop_services_keys = stop_services.keys()65 entire_service_list = [] # 从这里开始先用列表把不重复的站点取出来66 for key in stop_services_keys:67 this_stop = stop_services[f'{key}']68 for service_code in this_stop:69 if service_code in entire_service_list:70 continue71 else:72 entire_service_list.append(service_code)7374 entire_service_volume = {} # 用字典存储service code以及其对应的volume75 for service_code in entire_service_list:76 entire_service_volume[f'{service_code}'] = get_single_service_volume(service_code)7778 write_ram_to_json(entire_service_volume, 'raw', 'all_service_volume')79 return entire_service_volume808182def calculate_bus_volume_per_hour_per_stop(): # NaN -> list[dict] | 计算每个车站的车流量,返回被赋值后的车站表83 stop_ServicesNo = calculate_routes_per_stop()84 stops = read_json_to_ram('stops')85 service_volume = get_service_volume()86 # print(service_volume)87 for stop in stops:88 service_list = stop_ServicesNo.get(stop['BusStopCode']) # 取出了servicesNo里面记录了该公交站承载线路的键值对89 bus_volume = 090 for service_code in service_list:91 bus_volume += service_volume[f'{service_code}']92 stop['BusVolume'] = round(bus_volume, 2)9394 write_ram_to_json(stops, 'display_dynamic', 'stops_bus_volume')95 return stops969798def calculate_human_volume_alltime(): # NaN -> .json | 结构化人流量表,生成两个包含了全部时间情况的表99 weekday, holiday = read_transport_node_bus_csv('transport_node_bus_202108')100 for table in [weekday, holiday]:101 human_volume_by_stop = {}102 table_type = table[0][1]103 for row in table: # 取出所有的站点编号,并且去重104 existing_stops = human_volume_by_stop.keys()105 hour = row[2]106 stop_code = row[4]107 volume = int(row[5]) + int(row[6])108 if stop_code not in existing_stops:109 human_volume_by_stop[f'{stop_code}'] = {f'{hour}': f'{volume}'}110 elif stop_code in existing_stops:111 human_volume_by_stop[f'{stop_code}'][f'{hour}'] = f'{volume}'112113 if table_type == 'WEEKDAY':114 filename = 'human_volume_weekday'115 elif table_type == 'WEEKENDS/HOLIDAY':116 filename = 'human_volume_holiday'117 write_ram_to_json(human_volume_by_stop, 'raw', filename)118119120def get_current_human_volume(): # NaN -> .json | 直接从本地文件获取数据,读取当前车站人流量,赋值回车站121 calculate_human_volume_alltime()122 assign_coords_to_human_volume_alltime()123124 time_now = time.strftime("%a,%H", time.localtime()).split(",") # 先把time以及读哪张表给处理了125 hour = time_now[1]126 if time_now[0] == 'Sat' or time_now[0] == 'Sun':127 volume_alltime = read_json_to_ram('human_volume_holiday', 'raw')128 else:129 volume_alltime = read_json_to_ram('human_volume_weekday', 'raw')130 # print(volume_alltime)131 stops = read_json_to_ram('stops')132 item_id = -1133 for stop in stops:134 item_id += 1135 stop_code = stop['BusStopCode']136 try:137 stop['HumanVolume'] = int(volume_alltime[f'{stop_code}'][f'{hour}'])138 except:139 stop['HumanVolume'] = 0140141 write_ram_to_json(stops, 'display_dynamic', 'stops_human_volume')142143144def calculate_bus_human_balance(): # NaN -> .json | 计算车-人平衡压力,这个balance值越高说明人类给出的压力越大145 bus_volume = read_json_to_ram('stops_bus_volume', 'display_dynamic')146 human_volume = read_json_to_ram('stops_human_volume', 'display_dynamic')147 stops = read_json_to_ram('stops')148 index = 0149 for stop in stops:150 try:151 balance = human_volume[index]['HumanVolume'] / bus_volume[index]['BusVolume']152 except:153 balance = 0154 stop['Balance'] = round(balance, 2)155 index += 1156 write_ram_to_json(stops, 'display_dynamic', 'stops_balance')157158159def assign_coords_to_human_volume_alltime(): # NaN -> .json | 把之前的人流量拿出来赋个坐标,之前忘记搞了160 empty_stop = {} # 先创建一个空站台的字典,后面赋值给查不到的stopCode,这样后面js读取数据的时候所有站台的字典格式就是统一的,不存在空值161 for i in range(24):162 empty_stop[f'{i}'] = 0163164 stops = read_json_to_ram('stops')165 for i in ['holiday', 'weekday']:166 volume = read_json_to_ram(f'human_volume_{i}')167 for stop in stops:168 stop_code = stop['BusStopCode']169 try:170 stop['VolumeAlltime'] = volume[f'{stop_code}']171 except:172 stop['VolumeAlltime'] = empty_stop173 write_ram_to_json(stops, 'display_static', f'human_volume_{i}_coords')174175176if __name__ == "__main__":177 assign_coords_to_human_volume_alltime() ...

Full Screen

Full Screen

run_test_service_helper.py

Source:run_test_service_helper.py Github

copy

Full Screen

...9 if monkeypatch:10 monkeypatch.setattr(logging.root, "handlers", [])11 loop = asyncio.get_event_loop()12 service: Optional[ServiceContainer] = None13 def stop_services(loop: Any = None) -> None:14 if not loop:15 loop = asyncio.get_event_loop()16 asyncio.ensure_future(_stop_services())17 def force_stop_services(loop: Any = None) -> None:18 if not loop:19 loop = asyncio.get_event_loop()20 asyncio.ensure_future(_force_stop_services())21 async def _stop_services() -> None:22 if service:23 service.stop_service()24 async def _force_stop_services() -> None:25 if service and not service.started_waiter.done():26 service.started_waiter.set_result([])27 for signame in ("SIGINT", "SIGTERM"):28 loop.add_signal_handler(getattr(signal, signame), functools.partial(stop_services, loop))29 try:30 service = ServiceContainer(ServiceImporter.import_service_file(filename))31 assert service is not None32 async def _async() -> None:33 loop = asyncio.get_event_loop()34 try:35 await service.run_until_complete()36 except Exception:37 loop = asyncio.get_event_loop()38 stop_services(loop)39 force_stop_services(loop)40 raise41 future = asyncio.ensure_future(_async())42 if wait:43 loop.run_until_complete(asyncio.wait([service.started_waiter]))44 except Exception:45 for signame in ("SIGINT", "SIGTERM"):46 loop.remove_signal_handler(getattr(signal, signame))47 loop = asyncio.get_event_loop()48 for signame in ("SIGINT", "SIGTERM"):49 loop.remove_signal_handler(getattr(signal, signame))50 if service:51 stop_services(loop)52 raise53 services = {}54 if wait:55 for service_name, instance, log_level in service.started_waiter.result():56 services[service_name] = instance57 else:58 def get_services() -> Dict:59 loop.run_until_complete(asyncio.wait([service.started_waiter]))60 return {service_name: instance for service_name, instance, log_level in service.started_waiter.result()}61 return get_services, future...

Full Screen

Full Screen

csk-stop-services

Source:csk-stop-services Github

copy

Full Screen

1#!/usr/bin/env python2#########################################################################3# Copyright 2011 Cloud Sidekick4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#########################################################################17# This command does not have an API counterpart. 18# It calls the scripts to stop services.19# it's done in python so it can handle --dumpdoc for the 20# automatic documentation generator.21import os22import glob23import sys24if __name__ == '__main__':25 if len(sys.argv) > 1:26 # if there's an argument, simply switch on it. It'll only be asking for the api or dumpdoc.27 if sys.argv[1] == "--dumpdoc" or sys.argv[1] == "-H":28 print """Stop ALL Cloud Sidekick services. Accepts no arguments."""29 exit()30 else:31 exit()32 33 os.system("$CSK_HOME/velocity/services/stop_services.sh")34 os.system("$CSK_HOME/automate/services/stop_services.sh")35 if os.path.exists(os.path.join(os.environ["CSK_HOME"], "deploy")):36 os.system("$CSK_HOME/deploy/services/stop_services.sh")37 os.system("$CSK_HOME/canvas/services/stop_services.sh")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful