How to use send_command method in localstack

Best Python code snippet using localstack_python

Main_ZJ.py

Source:Main_ZJ.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2from PyQt4 import QtCore3import os,time,datetime,codecs4import telnetlib #telnet连接模块5import paramiko #ssh连接模块6import ConfigParser #配置文件模块7import sys,socket8import thread #处理和控制线程9import gc #垃圾回收10reload(sys) #重新加载sys模块11sys.setdefaultencoding('utf8') #设置utf8为默认编码12#Telnet登录13class Telnet():14 def __init__(self,host):15 self.telnet = telnetlib.Telnet(host, port = 10020, timeout=10) #连接telnet服务器16 self.telnet.set_debuglevel(2)17 #读取用户名及密码18 def Read(self,Prompt,Timeout):19 buff = ""20 try:21 buff += self.telnet.read_until(Prompt,Timeout) #读取指定的用户名或密码,Timeout超时22 except:23 self.Send("\n")24 buff += self.telnet.read_until(Prompt,Timeout)25 return buff26 #发送命令27 def Send(self,Command):28 self.telnet.write(str(Command)+'\n') #向远端发送命令29 #关闭连接30 def Close(self):31 self.telnet.close() #终止telnet连接32#ssh登录33class ssh():34 def __init__(self,host,username,passwd):35 self.s = paramiko.SSHClient() #建立一个连接对象36 self.s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #允许将信任的主机自动加入到host_allow 列表,此方法必须放在connect方法的前面37 self.s.connect(hostname=host,port=22, username=username, password=passwd,timeout = 30) #连接服务器38 self.ssh = self.s.invoke_shell() #建立交互式shell连接39 time.sleep(2)40 #发送数据41 def Send(self,Command):42 self.ssh.send(str(Command) + '\r')43 #接收数据44 def Recv(self,Buff_Size,Time):45 buff = ""46 try:47 buff += self.ssh.recv(Buff_Size,Time) #获取回显48 except:49 self.Send("\n")50 buff += self.ssh.recv(Buff_Size,Time)51 return buff52 #关闭连接53 def Close(self):54 self.s.close()55#获取配置文件信息56class Config_ini():57 def __init__(self,filename):58 self.config = ConfigParser.ConfigParser() #创建配置文件对象59 self.config.readfp(open(filename)) #打开并读取配置文件60 def get_info(self,session,key):61 return self.config.get(session,key) #读取配置文件中指定段的键值62 def session(self):63 return self.config.sections() #获取所有的段64 def option(self,session):65 return self.config.options(session) #得到指定段的所有信息66 def set(self,session,option,value):67 return self.config.set(session,option,value) #修改配置文件的值68#log信息保存69class Consumer(QtCore.QThread):70 def __init__(self,queue,parent = None):71 super(Consumer,self).__init__(parent)72 self.data = queue73 self.working = True74 def write_log(self,PD,SN,Time):75 if not os.path.exists(os.getcwd() + "\\log"): #exists()函数判断路径,getcwd()函数返回当前路径76 os.makedirs(os.getcwd() + "\\log") #递归创建目录77 Path = os.getcwd() + "\\log\\" #文件路径78 self.data_file = Path + PD + "_whole_" + SN + "_" + Time + ".log" #在Path路径下创建log文件79 while self.working: #循环80 s = self.data.get() #获取测试数据81 F = codecs.open(self.data_file, "a+", encoding='gb18030') #以指定的编码读取模式打开文件82 F.write(s + "\r\n") #写入文件83 F.close() #关闭文件84#主测试程序85class Main_Test(QtCore.QThread):86 def __init__(self,queue,parent = None):87 super(Main_Test,self).__init__(parent)88 self.data = queue #数据89 self.isWait = True #等待90 self.working = True #工作91 self.Input_IP_address=None92 self.error_count=0 #错误次数为093 self.Ship_Out_Address=None94 self.Red = "QPushButton{background-color:RED}" #红色95 self.Yellow = "QPushButton{background-color:YELLOW}" #黄色96 self.Green = "QPushButton{background-color:GREEN}" #绿色97 self.Config_ini = Config_ini("ini/Paramiters.ini") #获取配置文件信息98 self.GNS_SWV = self.Config_ini.get_info("GNS", "swv") #获取GNS段swv信息99 self.GNS_SPN = self.Config_ini.get_info("GNS", "spn")100 self.GNS_DPN = self.Config_ini.get_info("GNS", "dpn")101 self.GNS_BPN = self.Config_ini.get_info("GNS", "bpn")102 self.GNS_FPGAPN = self.Config_ini.get_info("GNS", "fpgapn")103 self.GNS_ISOPN = self.Config_ini.get_info("GNS", "isopn")104 self.GNS_MOD=self.Config_ini.get_info("GNS", "mod")105 self.GNS_FPGA=self.Config_ini.get_info("GNS","fpga_version")106 self.GNS_Docker = self.Config_ini.get_info("GNS", "docker")107 self.GNS_Nginx = self.Config_ini.get_info("GNS", "nginx")108 self.GNS_Nodejs = self.Config_ini.get_info("GNS", "nodejs")109 self.GNS_Mongodb = self.Config_ini.get_info("GNS", "mongodb")110 self.GNS_Underlying = self.Config_ini.get_info("GNS", "underlying")111 self.MAC = ""112 self.Boot = ""113 self.Kernel = ""114 self.APP = ""115 self.Config = ""116 #获取IP地址117 def Local_IP(self):118 self.Local_IP = socket.gethostbyname(socket.gethostname()) #获取本地主机名的IP地址119 return str(self.Local_IP)120 #连接设备121 def Connection(self, host):122 username = "gns"123 passwd = "feitian"124 try:125 self.Connect = ssh(str(host), username, passwd) #ssh连接服务器126 self.Send_Command("\n")127 except Exception, e:128 self.Test_Fail(str(e))129 #发送接收数据130 def Send_Command(self, Command, Prompt='#', Timeout=10,wait_time=1):131 try:132 buff = ""133 log = ""134 self.Connect.Send(Command) #发送命令135 starttime = datetime.datetime.now() #获取当前时间136 # while not buff.endswith(Prompt):137 while Prompt not in buff:138 buff = ""139 time.sleep(1)140 buff += self.Connect.Recv(99999,wait_time)141 log += buff142 self.data.put(buff)143 self.emit(QtCore.SIGNAL('output(QString)'), buff)144 endtime = datetime.datetime.now()145 if (endtime - starttime).seconds > Timeout:146 self.Test_Fail(u"超时, %s 不能找到" % Prompt)147 break148 return log149 except Exception, E:150 self.Test_Fail(u"命令错误,%s 不能找到" % Prompt)151 #通过开头和结尾字符串获取中心字符串152 def GetMiddleStr(self, content, startStr, endStr):153 try:154 startIndex = content.index(startStr) #检测content字符串中是否包含startstr字符串,返回开始的索引155 if startIndex >= 0:156 startIndex += len(startStr)157 endIndex = content.index(endStr) #检测content字符串中是否包含endstr字符串,返回开始的索引158 return content[startIndex:endIndex].strip() #移除字符串收尾的空格,并返回字符串指定的字符159 except Exception, e:160 self.Test_Fail(u"内容返回错误")161 #设置颜色162 def Set_Color(self, message):163 self.emit(QtCore.SIGNAL('color'), message)164 #设置地址165 def Set_Status(self, message):166 self.emit(QtCore.SIGNAL('status'), message)167 #错误消息168 def error(self,message):169 if message !="":170 self.emit(QtCore.SIGNAL('error'),message)171 self.data.put(message)172 #测试通过提示173 def Test_Pass(self,message):174 if message !="":175 self.emit(QtCore.SIGNAL('pass'),message) #发送信号176 self.data.put(message)177 #测试开始178 def Test_Running(self,message):179 l = "########################################" + message + "########################################"180 self.emit(QtCore.SIGNAL('dis_message'), l) #发送信号181 self.data.put(l) #入队182 #测试失败183 def Test_Fail(self,message):184 self.working = False185 self.error_count = 1 #错误计数186 self.Set_Color(self.Red)187 self.data.put(message) #测试失败信息入队188 self.error(message) #发送错误信息189 self.emit(QtCore.SIGNAL('error'), "<font color=red><font size = 10>%s</font>" % u"Test FAIL\n")190 self.emit(QtCore.SIGNAL('stop'), self.working)191 thread.exit_thread() # 终止线程192 #输入IP地址193 def Input_IP(self,message):194 self.emit(QtCore.SIGNAL('input'),message)195 while self.isWait:196 time.sleep(1)197 self.isWait = True198 return self.Input_IP_address199 #输入地址200 def Ship_Out_Address_setting(self,message):201 self.emit(QtCore.SIGNAL('ship'),message)202 while self.isWait:203 time.sleep(1)204 self.isWait = True205 return self.Ship_Out_Address206 #提示消息207 def Prompt(self,message):208 self.emit(QtCore.SIGNAL('Prompt'),message)209 while self.isWait:210 time.sleep(1)211 self.isWait=True212 #测试完成213 def Test_Finished(self,message):214 self.working = False215 if self.error_count == 0:216 self.Test_Pass(message)217 self.emit(QtCore.SIGNAL('stop'),self.working)218 thread.exit_thread() # 终止线程219 #程序运行220 def Script_Start(self,PD,SN,Time,Host,Server_IP = None):221 self.working = True222 self.PD = PD223 self.SN = SN224 self.Host = Host225 self.Time = Time226 self.error_count = 0 #错误计数227 Ping=os.system("ping -n 5 %s"%Host)228 if Ping == 0:229 self.Test_Running(u"##### Login #####") #测试开始信号230 self.Connection(Host) #连接测试设备231 os.system('netsh firewall set opmode disable') #后台关闭windows系统防火墙232 os.system("start /b iperf.exe -s -w1m&") #windows系统后台运行iperf,并指定选项233 time.sleep(5)234 self.Install_Tools() #GNS工具安装235 while self.working == True:236 self.VersionCheck() #版本检测237 self.MAC_check() #MAC地址检测238 self.ShowTemperature() #温度检测239 self.MemeryCheck() #内存检测240 if Server_IP != "":241 self.clock_test(Server_IP) #时钟检测242 self.Discrete() #离散量检测243 if Server_IP != "":244 self.EthSpeedCheck(Server_IP) #网口速率检测245 self.USBCheck() #USB检测246 self.SSDCheck() #硬盘检测247 self.Clean_Caches() #清理缓存248 os.system('netsh firewall set opmode mode=enable') #开启Windows防火墙249 os.system("taskkill /f /t /im iperf.exe") #关闭iperf进程250 os.system("taskkill /f /t /im cmd.exe") #关闭dos窗口251 self.Test_Finished(u"<font size = 10>LRU Test Completed PASS</font>")252 gc.collect() #垃圾回收253 else:254 self.Test_Fail(u"IP ping failed")255 self.Prompt(u"IP ping failed")256 #GNS工具安装257 def Install_Tools(self):258 self.Send_Command("cd /root/software/gns_test/")259 self.Send_Command("chmod +x *")260 self.Send_Command("./install_sysstat.sh","#",60)261 time.sleep(3)262 self.Send_Command("\cp -f kcpu /sbin")263 self.Send_Command("\cp -f iozone /sbin")264 self.Send_Command("\cp -f nmon /sbin")265 self.Send_Command("\cp -f iperf /sbin")266 self.Send_Command("\cp -f cpulimit /sbin")267 self.Send_Command("\cp -f run_kcpu.sh /sbin")268 self.Send_Command("cd 429")269 self.Send_Command("\cp -f * /sbin")270 self.Send_Command('cd /')271 #清理缓存272 def Clean_Caches(self):273 self.Send_Command("echo 3 >> /proc/sys/vm/drop_caches&")274 time.sleep(3)275 self.Send_Command("free -m")276 self.Send_Command('cd /')277 #MAC地址检测278 def MAC_check(self):279 self.Test_Running(u"### MAC Check ###")280 MAC = self.Send_Command("ifconfig | grep eth0")281 if "5C:E0:CA" in MAC:282 self.Test_Pass(u"MAC PASS")283 else:284 self.Test_Fail(u"MAC FAIL")285 #时钟检测286 def clock_test(self, Server_IP):287 self.Test_Running(U"##### NTP Check #####")288 os.system(289 'reg add HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer /v Enabled /t REG_DWORD /d 1 /f') # 修改注册表290 os.system(291 'reg add HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\Config /v AnnounceFlags /t REG_DWORD /d 5 /f') # 修改注册表292 os.system("net stop w32time & net start w32time") # 后台停止和启动windows系统NTP服务293 self.Send_Command("service ntpd stop")294 self.Send_Command("ntpdate %s &" % Server_IP, "Done", 15)295 time.sleep(5)296 date = self.Send_Command("date") #获取设备时间297 clock = str(datetime.datetime.now().year) #获取本地时间298 if clock in date:299 self.Test_Pass(u'NTP PASS')300 else:301 self.Test_Fail(u"NTP FAIL")302 #网口速率303 def EthSpeedCheck(self, Server_IP):304 self.Test_Running(u" Ethernet Front-end Ports Rate Check ")305 Ping_message = self.Send_Command("ping %s -c5" % Server_IP)306 if "Host Unreachable" in Ping_message:307 self.Test_Fail(u"Ping Server Fail")308 else:309 time.sleep(5)310 for count in range (1,6):311 time.sleep(3)312 Eth_Speed = self.Send_Command("iperf -c "+Server_IP+" -w1m -i1 -t30 | grep '0.0-3'&","Done",40)313 if "Broken pipe" not in Eth_Speed:314 result= self.GetMiddleStr(str(Eth_Speed),'Bytes',"Mbits")315 if float(result) >500: #以太网接口速率大于500兆316 if count < 5 :317 info = u'Ethernet port '+ str(count) + u' rate:' + str(result) + u'Mbits/sec PASS,please connect to ETH' + str(count+1)318 else :319 info = u'Ethernet port 5 rate:%sMbits/sec PASS'% str(result)320 self.Test_Pass(info)321 self.Prompt(info)322 else:323 info = u'Ethernet port rate ' + str(result) +u'Mbits/sec,FAIL.'324 self.Test_Fail(info)325 else:326 self.Test_Fail(u'Iperf address access failed')327 #温度检测328 def ShowTemperature(self):329 self.Test_Running(u"##### Temperature Check #####")330 temp = self.Send_Command("sensors -u&", "Done", 10)331 Core0 = self.GetMiddleStr(str(temp), 'temp2_input:', "temp2_max:")332 if float(Core0) > 96:333 self.Test_Fail(u"CPU Core 0 Temperature:%s℃,More than 96℃,FAIL" % Core0)334 else:335 self.Test_Pass(u"CPU Core 0 Temperature:%s℃,PASS" % Core0)336 Core1 = self.GetMiddleStr(str(temp), 'temp3_input:', "temp3_max:")337 if float(Core1) > 96:338 self.Test_Fail(u"CPU Core 1 Temperature:%s℃,More than 96℃,FAIL" % Core1)339 else:340 self.Test_Pass(u"CPU Core 1 Temperature:%s℃,PASS" % Core1)341 #内存检测342 def MemeryCheck(self):343 self.Test_Running(u"##### Memory Check #####")344 self.Send_Command("iostat -m")345 mem = self.Send_Command("free | grep Mem | awk '{print $2}'", "#", 30)346 mem = self.GetMiddleStr(str(mem), "}'", "[").strip()347 if float(mem) < 8126000:348 self.Test_Fail(u"Memory < 8G,FAIL")349 else:350 self.Test_Pass(u"Memory:%s,PASS" % mem)351 #版本检测352 def VersionCheck(self):353 self.Test_Running(u"##### Version Check #####")354 vershow = self.Send_Command("vershow&", "Done", 60,5)355 underlying_version=self.GetMiddleStr(vershow,'underlying version:',"Config").strip()356 if underlying_version == self.GNS_Underlying:357 self.Test_Pass(u"Software Version:%s,PASS" % underlying_version)358 else:359 self.Test_Fail(u"Software Version:%s,FAIL" % underlying_version)360 fpga = self.Send_Command('fpga_version')361 FPGA=self.GetMiddleStr(fpga,'version:',"[").strip()362 if FPGA == self.GNS_FPGA:363 self.Test_Pass(u"FPGA:%s,PASS" % FPGA)364 else:365 self.Test_Fail(u"FPGA:%s,FAIL" % FPGA)366 swv = self.Send_Command('swv_read')367 swv = self.GetMiddleStr(swv, "version :", "[").strip()368 if swv == self.GNS_SWV:369 self.Test_Pass(u"SWV:%s,PASS" % swv)370 else:371 self.Test_Fail(u"SWV:%s,FAIL" % swv)372 mod = self.Send_Command('mod_read')373 mod = self.GetMiddleStr(mod, "Mod number:", "[").strip()374 if mod == self.GNS_MOD:375 self.Test_Pass(u"MOD:%s,PASS" % mod)376 else:377 self.Test_Fail(u"MOD:%s,FAIL" % mod)378 spn = self.Send_Command('spn_read')379 spn = self.GetMiddleStr(spn, "part number:", "[").strip()380 if spn == self.GNS_SPN:381 self.Test_Pass(u"SPN:%s,PASS" % spn)382 else:383 self.Test_Fail(u"SPN:%s,FAIL" % spn)384 dpn = self.Send_Command('dpn_read')385 dpn = self.GetMiddleStr(dpn, "PN_VERSION:", "[").strip()386 if dpn == self.GNS_DPN:387 self.Test_Pass(u"DPN:%s,PASS" % dpn)388 else:389 self.Test_Fail(u"DPN:%s,FAIL" % dpn)390 bpn = self.Send_Command('board_pn_read')391 bpn = self.GetMiddleStr(bpn, "Board_Product_Number:", "[").strip()392 if bpn == self.GNS_BPN:393 self.Test_Pass(u"BPN:%s,PASS" % bpn)394 else:395 self.Test_Fail(u"BPN:%s,FAIL" % bpn)396 fpgapn = self.Send_Command('fpgapn_read')397 fpgapn = self.GetMiddleStr(fpgapn, "fpga part number:", "[").strip()398 if fpgapn == self.GNS_FPGAPN:399 self.Test_Pass(u"FPGA PN:%s,PASS" % fpgapn)400 else:401 self.Test_Fail(u"FPGA PN:%s,FAIL" % fpgapn)402 isopn = self.Send_Command('isopn_read')403 isopn = self.GetMiddleStr(isopn, "iso part number:", "[").strip()404 if isopn == self.GNS_ISOPN:405 self.Test_Pass(u"ISO PN:%s,PASS" % isopn)406 else:407 self.Test_Fail(u"ISO PN:%s,FAIL" % isopn)408 if "3.10.5-3.el6.x86_64" in self.Send_Command('uname -a'):409 self.Test_Pass(u"System Version:3.10.5-3.el6.x86_64,PASS")410 else:411 self.Test_Fail(u"System Version:3.10.5-3.el6.x86_64,FAIL")412 dsn = self.Send_Command('dsn_read')413 bsn = self.Send_Command('board_sn_read')414 if "failed" in vershow:415 self.Test_Fail(u"Version Upgrade FAIL")416 #USB模块检测417 def USBCheck(self):418 self.Test_Running(u"##### USB Check #####")419 self.Prompt(u"Please insert a USB flash drive in FAT32 format") #消息提示420 self.Send_Command('mkdir usb','#',10)421 usb_drive=self.Send_Command("fdisk -l | grep FAT32 | awk '{print $1}'")422 drive= self.GetMiddleStr(str(usb_drive),"}'","[").strip()423 if '/dev/sd' in drive:424 self.Test_Pass('U Disk PASS')425 self.Send_Command('mount %s usb' % drive)426 self.Send_Command('cd usb' )427 usb_wr = self.Send_Command('dd bs=1M count=100 if=%s of=test conv=fsync'%drive,'#',30)428 usb_wr_A= self.GetMiddleStr(str(usb_wr),'s,',"[")429 self.Send_Command('rm -rf test')430 self.Send_Command('cd ..' )431 self.Send_Command('umount usb')432 self.Send_Command('rm -rf usb')433 self.Test_Pass(u"USB W+R Rate: %s"%str(usb_wr_A) )434 else:435 self.Test_Fail(u'U Disk FAIL' )436 #硬盘检测437 def SSDCheck(self):438 self.Test_Running(u"### SSD Check ###")439 if 'SATA' in self.Send_Command('lspci'):440 self.Test_Pass(u'SSD Module Recognition PASS')441 else:442 self.Test_Fail(u"SSD Module Recognition FAIL")443 num1 = self.Send_Command("fdisk -l | grep -c sda")444 num2 = self.Send_Command("df | grep -c sda")445 if "7" in num1 and "4" in num2:446 GNS_DISK = self.Send_Command('fdisk -l | grep sda')447 sda_size = self.GetMiddleStr(GNS_DISK, '/dev/sda:', "GB")448 if float(sda_size) > 950:449 self.Test_Pass(u"SSD-1 size=%sGB PASS" % sda_size)450 else:451 self.Test_Fail(u"SSD-1 size<950GB FAIL")452 GNS_DISK = self.Send_Command('fdisk -l | grep sdb')453 if '/dev/sdb' in GNS_DISK:454 sdb_size = self.GetMiddleStr(str(GNS_DISK), 'Disk /dev/sdb: ', "GB,")455 if float(sdb_size) > 950:456 self.Test_Pass(u"SSD-2 size=%sGB PASS" % sdb_size)457 else:458 self.Test_Fail(u"SSD-2 size<950GB FAIL")459 else:460 self.Test_Fail(u"SSD-2 Recognition FAIL ")461 HD_write = self.Send_Command('dd bs=16M count=1024 if=/dev/zero of=test conv=fdatasync', '#', 300,10)462 HD_write_A = self.GetMiddleStr(str(HD_write), 's,', "MB/s")463 HD_read = self.Send_Command('dd bs=16M if=test of=/dev/null', '#', 300,10)464 HD_read_A = self.GetMiddleStr(str(HD_read), 's,', "MB/s")465 self.Send_Command("rm -rf test", "#", 30)466 if float(HD_write_A) < 150:467 self.Test_Fail(u"SSD Write Rate < 150MB/s FAIL")468 elif float(HD_read_A) < 170:469 self.Test_Fail(u"SSD Read Rate < 170MB/s FAIL")470 else:471 self.Test_Pass(u"SSD Read Rate:%sMB/s PASS" % str(HD_read_A))472 self.Test_Pass(u"SSD Write Rate:%sMB/s PASS" % str(HD_write_A))473 else:474 self.Test_Fail(u"SSD Partition FAIL")475 #离散量检测476 def Discrete(self):477 self.Test_Running(u"#### Discrete Check ####")478 self.Send_Command("arinc set_control_off")479 for i in range(1,17):480 if i<15:481 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))482 else:483 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))484 low = self.Send_Command("arinc get_signalstatusmatrix", '#', 5)485 low=self.GetMiddleStr(low,'get_signalstatusmatrix','[')486 low = int(low[3] + low[7] + low[11] + low[15] + low[19] + low[23] + low[27] + low[31] + low[35] + low[39] + low[44] +low[49] + low[54])487 for i in range(1, 17):488 if i < 15:489 self.Send_Command("hi8435_cfg wrDiscOut %s high" % str(i))490 else:491 self.Send_Command("hi8435_cfg wrDiscOut %s high" % str(i))492 high = self.Send_Command("arinc get_signalstatusmatrix", '#', 5)493 high = self.GetMiddleStr(high,'get_signalstatusmatrix','[')494 high = int(high[3] + high[7] + high[11] + high[15] + high[19] + high[23] + high[27] + high[31] + high[35] + high[39] +high[44] + high[49] + high[54])495 log1=low+high496 for i in range(1, 17):497 if i < 15:498 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))499 else:500 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))501 low = self.Send_Command("arinc get_signalstatusmatrix", '#', 5)502 low = self.GetMiddleStr(low, 'get_signalstatusmatrix', '[')503 low = int(low[3] + low[7] + low[11] + low[15] + low[19] + low[23] + low[27] + low[31] + low[35] + low[39] + low[44] +low[49] + low[54])504 log2=high+low505 if log1==1111111111111 and log2==1111111111111:506 self.Test_Pass('Discrete Check PASS')507 else:508 self.Test_Fail('Discrete Check FAIL')509 #429检测510 self.Test_Running(u"##### ARINC429 Check #####")511 self.Send_Command("hi3593_c0_cfg setRx1BitRate high\r")512 self.Send_Command("hi3593_c0_cfg setRx2BitRate high\r")513 self.Send_Command("hi3593_c0_cfg setTxBitRate high\r")514 self.Send_Command("hi3593_c1_cfg setRx1BitRate high\r")515 self.Send_Command("hi3593_c1_cfg setRx2BitRate high\r")516 self.Send_Command("hi3593_c1_cfg setTxBitRate high\r")517 if int(self.GetMiddleStr(self.Send_Command("ps | grep -c netlink_u_self.APP"), "self.APP", "[")) < 1:518 self.Send_Command("netlink_u_app &\r", '#', 3)519 c = self.Send_Command("hi429_sendmsg_user_chip0 123 3\r")520 if "0x42910001" in c:521 self.Test_Pass(u"ARINC429 CHIP0 PASS")522 #if "0x42910002" not in c:523 #self.Test_Fail(u"429 CHIP0 RX2 测试失败")524 else:525 self.Test_Fail(u"ARINC429 CHIP0 FAIL")526 d = self.Send_Command("hi429_sendmsg_user_chip1 456 3\r")527 if "0x42920001" in d:528 self.Test_Pass(u"ARINC429 CHIP1 PASS")529 #if "0x42920002" not in d:530 #self.Test_Fail(u"429 CHIP1 RX4 测试失败")531 else:532 self.Test_Fail(u"ARINC429 CHIP1 FAIL")533 #升级534 def upgrade(self,PD,Host,Boot,Kernel,APP,Config,MAC,Server_IP):535 self.working = True536 self.starttime = datetime.datetime.now()537 self.PD = PD538 self.Host = Host539 self.Boot = Boot540 self.Kernel = Kernel541 self.APP = APP542 self.Config = Config543 self.MAC = MAC544 self.error_count = 0545 self.Connection(Host)546 time.sleep(5)547 PD_message = self.Send_Command("cat /sbin/PRODUCT_MESSAGE ","#",5 )548 SN = self.GetMiddleStr(str(PD_message),'SN_VERSION:',"PN_VERSION:").strip()549 PN = self.GetMiddleStr(str(PD_message),"PN_VERSION:",'CPLD').strip()550 if self.Boot == '' and self.Kernel == "" and self.APP == "" and self.Config == "":551 if "failed" not in self.Send_Command("vershow",'#',15):552 if len(self.MAC) == 17:553 self.Test_Running(u"program PD information")554 self.Send_Command("pm_flash " + str(self.PD).lower() +" "+ PN + " "+ self.MAC + " "+ SN,"complete",20 )555 self.Send_Command("configsave","Config save success",10)556 self.Send_Command("\r")557 else:558 self.Test_Fail(u'请先升级')559 else:560 while self.working :561 if self.Boot != '':562 self.Test_Running(u"upgrade Boot")563 if "timeout" in self.Send_Command("tftp -gr "+ self.Boot + " "+ Server_IP,"#",10):564 self.Test_Fail(u"文件上传失败")565 if 'failed ' in self.Send_Command("bootupdate "+ self.Boot,"#",30):566 self.Test_Fail(u'Boot文件不匹配')567 self.Send_Command("\r")568 if self.Kernel != "":569 self.Test_Running(u"upgrade Kernel")570 self.Send_Command("tftp -gr " + self.Kernel + " "+ Server_IP,"#",30 )571 self.Send_Command("kernelupdate " + self.Kernel,"system for testing",150)572 self.Send_Command("\r")573 if self.APP != "":574 self.Test_Running(u"upgrade APP")575 self.Send_Command("rm /mnt/mmcfs/fsself.APP*")576 self.Send_Command("ls -l /mnt/mmcfs","#",5)577 self.Send_Command("tftp -gr " + self.APP + " "+ Server_IP,"#",30)578 self.Send_Command("fsappsave %s &"%self.APP,"Done",600)579 self.Send_Command("\r")580 if self.Config != "":581 self.Test_Running(u"upgrade Configuration")582 self.Send_Command("rm /mnt/mmcfs/fscon*")583 self.Send_Command("ls -l /mnt/mmcfs","#",5)584 self.Send_Command("tftp -gr " + self.Config + " " +Server_IP,"#",10 )585 if self.PD == 'CWAP' or self.PD == 'TWCU':586 self.Send_Command("fsconsave 0 " + self.Config,"save succes",30)587 else:588 self.Send_Command("fsconsave 10 " + self.Config,"save succes",30)589 self.Send_Command("\r")590 self.Send_Command("reboot\r","reboot")591 self.Connect.Close()592 self.Test_Running(u"rebooting")593 time.sleep(100)594 if self.PD == 'RWS22':595 Host_IP = "10.66.10.1"596 self.Connection(str(Host_IP))597 if len(self.MAC) == 17:598 self.Test_Running(u"program PD information")599 self.Send_Command("pm_flash " + str(self.PD).lower() +" "+ PN + " "+ self.MAC + " "+ SN,"complete",20 )600 self.Send_Command("configsave","save success",10)601 self.Send_Command("\r")602 if self.Boot != '':603 self.Test_Running(u"upgrade sBoot2")604 self.Send_Command("tftp -gr "+ self.Boot + " "+ Server_IP,"#",10)605 self.Send_Command("bootupdate "+ self.Boot,"#",30)606 self.Send_Command("\r")607 if self.Kernel != "":608 self.Test_Running(u"upgrade Kernel 2")609 self.Send_Command("kernelconfirm")610 self.Send_Command("tftp -gr " + self.Kernel + " "+ Server_IP ,"#",30)611 self.Send_Command("\r")612 self.Send_Command("kernelupdate " + self.Kernel,"system for testing",150)613 self.Send_Command("\r")614 if self.APP != "":615 self.Test_Running(u"upgrade APP 2")616 self.Send_Command("tftp -gr " + self.APP + " "+ Server_IP,"#",30 )617 self.Send_Command("fsappsave %s &"%self.APP,"Done",600)618 self.Send_Command("\r")619 if self.Config != "":620 self.Test_Running(u"upgrade Configuration 2")621 self.Send_Command("tftp -gr " + self.Config + " " +Server_IP,"#",10)622 if self.PD == 'CWAP' or self.PD == 'TWCU':623 self.Send_Command("fsconsave 0 " + self.Config,"save succes",30)624 else:625 self.Send_Command("fsconsave 10 " + self.Config,"save succes",30)626 self.Send_Command("\r")627 self.Send_Command("reboot\r","reboot")628 self.Connect.Close()629 self.Test_Running(u"rebooting")630 time.sleep(100)631 self.Connection(str(Host_IP))632 if self.Kernel != "":633 self.Send_Command("kernelconfirm")634 self.Test_Running(u"checking version")635 if "failed" in self.Send_Command("vershow",'#',15):636 self.Test_Fail(u'版本错误,升级失败')637 else:638 self.Test_Finished(u"升级成功")639 #升级config640 def Ship_Out(self,PD,Host,Config,Server_IP):641 self.PD = PD642 self.Connection(Host) #连接主机643 time.sleep(5)644 if Config != "":645 self.Test_Running(u"upgrade configuration")646 self.Send_Command("rm /mnt/mmcfs/fscon*")647 self.Send_Command("ls -l /mnt/mmcfs","#",5)648 self.Send_Command("tftp -gr " + Config + " " +Server_IP,"#",5 )649 self.Ship_Out_Address_setting(U"请输入需要设置的二、三段地址")650 self.Send_Command("fsconsave " + self.Ship_Out_Address + " " + Config,"save succes",30)651 self.Send_Command("\r")652 self.Send_Command("reboot\r","reboot")653 self.Connect.Close()654 self.Test_Running(u"rebooting")655 time.sleep(120)656 Ship_IP = "10."+self.Ship_Out_Address+".1"657 self.Connection(str(Ship_IP))658 self.Test_Running(u"upgrade configuration 2")659 self.Send_Command("tftp -gr " + Config + " " +Server_IP,"#",5)660 self.Send_Command("fsconsave " + self.Ship_Out_Address + " " + Config,"save succes",30)661 self.Send_Command("\r")662 self.Send_Command("reboot\r","reboot")663 self.Connect.Close()664 self.Test_Running(u"rebooting")665 time.sleep(100)666 self.Connection(str(Ship_IP))667 self.Test_Running(u"checking version")668 if "failed" in self.Send_Command("vershow",'#',15):669 self.Test_Fail(u'版本错误,升级失败')670 else:671 self.Test_Finished(u"出厂配置完成")672 #修改IP地址673 def Modify_IP(self,PD,Host):674 self.PD = PD675 self.Connection(Host)676 time.sleep(5)677 self.Test_Running(u"Modify IP address:")678 self.Send_Command("\r")679 IP_Modify = self.Input_IP(U"请输入你想改成的设备IP地址:")680 PCB_MAC = self.Send_Command("cat sbin/PRODUCT_MESSAGE | grep MacAddr | awk '{print $1}'")681 FT600_MAC = str(PCB_MAC[-13:-8])682 self.Send_Command("sed -i 's/" + Host + "/" + IP_Modify + "/g' /etc/config/Ethernet")683 self.Send_Command("sed -i 's/255.255.255.0/255.0.0.0/g' /etc/config/Ethernet")684 self.Send_Command("cat /etc/config/Ethernet")685 self.Send_Command("configsave", "config save success", 30)686 self.Send_Command("reboot","reboot")687 if PD == 'RWLU-1U':688 self.Connect.Close()689 FT600_IP = str(Host[:-3] + str(int(Host[-3:]) + 100))690 FT600_Modify_IP = IP_Modify[:-3] + str(int(IP_Modify[-3:]) + 100)691 try:692 self.Connection(str(FT600_IP))693 except Exception, e:694 self.Test_Fail(e)695 self.FT600_Login()696 self.Send_Command("sed -i 's/11:12/" + str(FT600_MAC).upper() + "/g' /sbin/PRODUCT_MESSAGE ")697 self.Send_Command("rm /etc/config/wireless")698 self.Send_Command("sed -i '/br-lan/'d /etc/rc.d/rcS")699 self.Send_Command("sed -i '/net.sh start/a\ifconfig br-lan " + FT600_Modify_IP + "' /etc/rc.d/rcS ")700 self.Send_Command("sed -n '/br-lan/p' /etc/rc.d/rcS")701 self.Send_Command("reboot","reboot")702 time.sleep(2)703 self.Test_Finished(u"IP修改成功")704 #磁盘格式化705 def Format_Disk(self,PD, Host):706 self.PD = PD707 self.Connection(Host)708 time.sleep(5)709 self.Test_Running(u"Format DISK")710 if '5' not in self.Send_Command('fdisk -l | grep -c "/dev/sda"'):711 self.Send_Command("fdisk /dev/sda","Command (m for help):")712 self.Send_Command("n",":")713 self.Send_Command("p","Partition number (1-4):")714 self.Send_Command("1","):")715 self.Send_Command("","):")716 self.Send_Command("+900M","Command (m for help):")717 self.Send_Command("n",":")718 self.Send_Command("p","Partition number (1-4):")719 self.Send_Command("2","):")720 self.Send_Command("","):")721 self.Send_Command("+100M","Command (m for help):")722 self.Send_Command("n",":")723 self.Send_Command("p","Partition number (1-4):")724 self.Send_Command("3","):")725 self.Send_Command("","):")726 self.Send_Command("+2048M","Command (m for help):")727 self.Send_Command("n",":")728 self.Send_Command("p","Partition number (1-4):")729 self.Send_Command("","):")730 self.Send_Command("","Command (m for help):")731 self.Send_Command("t","Partition number (1-4):")732 self.Send_Command("3","Hex code (type L to list codes):")733 self.Send_Command("82","Command (m for help):")734 self.Send_Command("p","Command (m for help):")735 self.Send_Command("w"," #")736 self.Send_Command("fdisk -l"," #")737 self.Send_Command("mkfs.ext3 /dev/sda1"," #")738 self.Send_Command("mkfs.ext3 /dev/sda2"," #")739 self.Send_Command("mkfs.ext3 /dev/sda3"," #")740 self.Send_Command("mkfs.ext3 /dev/sda4"," #")741 self.Send_Command("mount /dev/sda4 /mnt/mmc0")742 self.Send_Command("mount /dev/sda1 /mnt/mmc1")743 self.Send_Command("df -h")...

Full Screen

Full Screen

igusd1_labview_edition.py

Source:igusd1_labview_edition.py Github

copy

Full Screen

...128 try:129 SOCK.connect((ip_address, port))130 except:131 pass132def send_command(data):133 """134 Send a command to the device.135 Parameters136 ----------137 data : bytearray138 Bytearray to send to the device.139 Returns140 -------141 bytes142 Returned data as a bytes object.143 """144 SOCK.send(data)145 res = SOCK.recv(24)146 return res147def set_shutdown():148 """149 Tell the device to shutdown (set status to 00000000 00000110).150 Returns151 -------152 None.153 """154 send_command(get_array("shutdown"))155 ba_1 = make_bytearray(0, [96, 65], 0, 2, [33, 6])156 ba_2 = make_bytearray(0, [96, 65], 0, 2, [33, 22])157 ba_3 = make_bytearray(0, [96, 65], 0, 2, [33, 2])158 while (send_command(get_array("status")) !=159 ba_1160 and161 send_command(get_array("status")) !=162 ba_2163 and164 send_command(get_array("status")) !=165 ba_3):166 print('Waiting for shutdown...')167 time.sleep(1)168def set_switch_on():169 """170 Tell the device to switch on (set status to 00000000 00000111).171 Returns172 -------173 None.174 """175 send_command(get_array("switch_on"))176 while (send_command(get_array("status")) !=177 make_bytearray(0, [96, 65], 0, 2, [35, 6])178 and179 send_command(get_array("status")) !=180 make_bytearray(0, [96, 65], 0, 2, [35, 22])181 and182 send_command(get_array("status")) !=183 make_bytearray(0, [96, 65], 0, 2, [35, 2])):184 print('Waiting for switch-on...')185 time.sleep(1)186def set_enable_operation():187 """188 Tell the device to enable operation (set status to 00000000 00001111).189 Returns190 -------191 None.192 """193 send_command(get_array("enable_operation"))194 while (send_command(get_array("status")) !=195 make_bytearray(0, [96, 65], 0, 2, [39, 6])196 and197 send_command(get_array("status")) !=198 make_bytearray(0, [96, 65], 0, 2, [39, 22])199 and200 send_command(get_array("status")) !=201 make_bytearray(0, [96, 65], 0, 2, [39, 2])):202 print('Waiting for enabling operation...')203 time.sleep(1)204def init():205 """206 Attempt to enable the device (only works if ???)207 Returns208 -------209 None.210 """211 set_shutdown()212 set_switch_on()213 set_enable_operation()214def set_feedrate(feedrate):215 """216 Set the feedrate to specified value.217 Parameters218 ----------219 feedrate : int220 Feedrate in steps per revolution.221 Returns222 -------223 None.224 """225 feedrate_bytes = feedrate.to_bytes(4, "little")226 send_command(make_bytearray(1, [96, 146], 1, 2, [feedrate_bytes[0],227 feedrate_bytes[1]]))228 send_command(make_bytearray(1, [96, 146], 2, 1, [1]))229def set_mode(mode):230 """231 Set the movement mode of the device.232 Parameters233 ----------234 mode : int235 Selected mode (1 [move] / 6 [home]).236 Returns237 -------238 None.239 """240 send_command(make_bytearray(1, [96, 96], 0, 1, [mode]))241 while (send_command(make_bytearray(0, [96, 97], 0, 1))242 !=243 make_bytearray(0, [96, 97], 0, 1, [mode])):244 time.sleep(1)245def set_homing(method, find_velocity, zero_velocity, acceleration):246 """247 Find reference point (home) [method doesn't work yet I guess?].248 Parameters249 ----------250 method : str251 Homing method ("LSN" [limit switch negative] / ???).252 find_velocity : int253 Velocity when searching for switch (upper limit: 50000).254 zero_velocity : int255 Velocity when zeroing away from switch after pressing it.256 acceleration : int257 Acceleration/deceleration when starting/stopping move.258 Returns259 -------260 None.261 """262 set_mode(6)263 methods = {264 "LSN": 17,265 "LSP": 18,266 "IEN": 33,267 "IEP": 34,268 "SCP": 37,269 "AAF": 255270 }271 selected_method = methods[method]272 # homing method273 send_command(make_bytearray(1, [96, 152], 1, 1, [selected_method]))274 set_feedrate(6000)275 # homing velocity – max search velocity276 find_velocity_bytes = find_velocity.to_bytes(4, "little")277 send_command(make_bytearray(1, [96, 153], 1, 2, [find_velocity_bytes[0],278 find_velocity_bytes[1]]))279 # zeroing velocity – velocity after contact280 zero_velocity_bytes = zero_velocity.to_bytes(4, "little")281 send_command(make_bytearray(1, [96, 153], 2, 2, [zero_velocity_bytes[0],282 zero_velocity_bytes[1]]))283 # homing acceleration284 acceleration_bytes = acceleration.to_bytes(4, "little")285 send_command(make_bytearray(1, [96, 154], 0, 2, [acceleration_bytes[0],286 acceleration_bytes[1]]))287 # start movement288 print(list(send_command(make_bytearray(1, [96, 64], 0, 2, [31, 0]))))289 # reset start bit290 send_command(make_bytearray(1, [96, 64], 0, 2, [15, 0]))291 while (send_command(get_array("status"))292 !=293 make_bytearray(0, [96, 65], 0, 2, [39, 22])):294 print("wait for Homing to end")295 print(list(send_command(get_array("status"))))296 time.sleep(1)297 send_command(get_array("enable_operation"))298def move(velocity,299 acceleration,300 target_position):301 """302 Move the sled to an arbitrary position within the limits of the rod.303 Parameters304 ----------305 velocity : int306 Velocity when moving.307 acceleration : int308 Acceleration/deceleration when starting/stopping move.309 target_position : int310 Target position in steps.311 Returns312 -------313 None.314 """315 set_mode(1)316 velocity_bytes = velocity.to_bytes(4, "little")317 send_command(make_bytearray(1, [96, 129], 0, 4, [velocity_bytes[0],318 velocity_bytes[1],319 velocity_bytes[2],320 velocity_bytes[3]]))321 acceleration_bytes = acceleration.to_bytes(4, "little")322 send_command(make_bytearray(1, [96, 131], 0, 4, [acceleration_bytes[0],323 acceleration_bytes[1],324 acceleration_bytes[2],325 acceleration_bytes[3]]))326 target_position_bytes = target_position.to_bytes(4, "little")327 send_command(make_bytearray(1, [96, 122], 0, 4, [target_position_bytes[0],328 target_position_bytes[1],329 target_position_bytes[2],330 target_position_bytes[3]]))331 print(list(send_command(make_bytearray(1, [96, 64], 0, 2, [31, 0]))))332 send_command(make_bytearray(1, [96, 64], 0, 2, [15, 0]))333 while (send_command(get_array("status"))334 !=335 make_bytearray(0, [96, 65], 0, 2, [39, 6])):336 time.sleep(0.1)337 status = []338 actual_position_bytes = send_command(make_bytearray(0, [96, 100], 0, 4))339 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_position_bytes)[0])340 actual_velocity_bytes = send_command(make_bytearray(0, [96, 108], 0, 4))341 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_velocity_bytes)[0])342 print(status)343 send_command(get_array("enable_operation"))344def staggered_move(velocity,345 acceleration,346 start_position,347 iterations,348 step_width,349 wait_time,350 go_back351 ):352 """353 Move the sled to a start position, then move a specified amount of times354 by a specified distance and wait for a specified interval.355 Parameters356 ----------357 velocity : int358 Velocity when moving.359 acceleration : int360 Acceleration/deceleration when starting/stopping move.361 start_position : int362 Initial position to move to.363 iterations : int364 Amount of times to increment the position by step_width.365 step_width : int366 Distance between iterations.367 wait_time : float368 Wait time between moves.369 go_back : bool370 Go back to start_position at the end? (True [yes] / False [no]).371 Returns372 -------373 None.374 """375 set_mode(1)376 velocity_bytes = velocity.to_bytes(4, "little")377 send_command(make_bytearray(1, [96, 129], 0, 4, [velocity_bytes[0],378 velocity_bytes[1],379 velocity_bytes[2],380 velocity_bytes[3]]))381 acceleration_bytes = acceleration.to_bytes(4, "little")382 send_command(make_bytearray(1, [96, 131], 0, 4, [acceleration_bytes[0],383 acceleration_bytes[1],384 acceleration_bytes[2],385 acceleration_bytes[3]]))386 move(velocity, acceleration, start_position)387 for i in range(iterations):388 move(velocity, acceleration, start_position + step_width * (i + 1))389 time.sleep(wait_time)390 if go_back is True:391 move(velocity, acceleration, start_position)392def get_status():393 """394 Receive current position and velocity.395 Returns396 -------397 status : list[2]398 Receive status list [current position, current velocity].399 """400 status = []401 actual_position_bytes = send_command(make_bytearray(0, [96, 100], 0, 4))402 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_position_bytes)[0])403 actual_velocity_bytes = send_command(make_bytearray(0, [96, 108], 0, 4))404 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_velocity_bytes)[0])405 return status406def close():407 """408 Closes the socket.409 Returns410 -------411 None.412 """...

Full Screen

Full Screen

epd2in13_V2.py

Source:epd2in13_V2.py Github

copy

Full Screen

...76 epdconfig.digital_write(self.reset_pin, 0)77 epdconfig.delay_ms(100)78 epdconfig.digital_write(self.reset_pin, 1)79 epdconfig.delay_ms(100) 80 def send_command(self, command):81 epdconfig.digital_write(self.cs_pin, 1)82 epdconfig.digital_write(self.cs_pin, 0)83 epdconfig.digital_write(self.dc_pin, 0)84 epdconfig.spi_writebyte([command])85 epdconfig.digital_write(self.cs_pin, 1)86 def send_data(self, data):87 88 epdconfig.digital_write(self.cs_pin, 1)89 epdconfig.digital_write(self.cs_pin, 0)90 epdconfig.digital_write(self.dc_pin, 1)91 epdconfig.spi_writebyte([data])92 epdconfig.digital_write(self.cs_pin, 1)93 94 def ReadBusy(self):95 while(epdconfig.digital_read(self.busy_pin) == 1): # 0: idle, 1: busy96 pass97 # epdconfig.delay_ms(100) 98 def TurnOnDisplay(self):99 self.send_command(0x22)100 self.send_data(0xC7)101 self.send_command(0x20) 102 self.ReadBusy()103 104 def TurnOnDisplayPart(self):105 self.send_command(0x22)106 self.send_data(0x0c)107 self.send_command(0x20) 108 self.ReadBusy()109 110 def init(self, update):111 if (epdconfig.module_init() != 0):112 return -1113 # EPD hardware init start114 self.reset()115 if(update == self.FULL_UPDATE):116 self.ReadBusy()117 self.send_command(0x12) # soft reset118 self.ReadBusy()119 self.send_command(0x74) #set analog block control120 self.send_data(0x54)121 self.send_command(0x7E) #set digital block control122 self.send_data(0x3B)123 self.send_command(0x01) #Driver output control124 self.send_data(0xF9)125 self.send_data(0x00)126 self.send_data(0x00)127 self.send_command(0x11) #data entry mode128 self.send_data(0x01)129 self.send_command(0x44) #set Ram-X address start/end position130 self.send_data(0x00)131 self.send_data(0x0F) #0x0C-->(15+1)*8=128132 self.send_command(0x45) #set Ram-Y address start/end position133 self.send_data(0xF9) #0xF9-->(249+1)=250134 self.send_data(0x00)135 self.send_data(0x00)136 self.send_data(0x00)137 138 self.send_command(0x3C) #BorderWavefrom139 self.send_data(0x03)140 self.send_command(0x2C) #VCOM Voltage141 self.send_data(0x50) #142 self.send_command(0x03)143 self.send_data(self.lut_full_update[100])144 self.send_command(0x04) #145 self.send_data(self.lut_full_update[101])146 self.send_data(self.lut_full_update[102])147 self.send_data(self.lut_full_update[103])148 self.send_command(0x3A) #Dummy Line149 self.send_data(self.lut_full_update[105])150 self.send_command(0x3B) #Gate time151 self.send_data(self.lut_full_update[106])152 self.send_command(0x32)153 for count in range(100):154 self.send_data(self.lut_full_update[count])155 self.send_command(0x4E) # set RAM x address count to 0156 self.send_data(0x00)157 self.send_command(0x4F) # set RAM y address count to 0X127158 self.send_data(0xF9)159 self.send_data(0x00)160 self.ReadBusy()161 else:162 # self.send_command(0x2C) #VCOM Voltage163 # self.send_data(0x26)164 self.ReadBusy()165 self.send_command(0x32)166 for count in range(100):167 self.send_data(self.lut_partial_update[count])168 self.send_command(0x37)169 self.send_data(0x00)170 self.send_data(0x00)171 self.send_data(0x00)172 self.send_data(0x00)173 self.send_data(0x00)174 self.send_data(0x40)175 self.send_data(0x00)176 177 self.send_command(0x22)178 self.send_data(0xC0)179 self.send_command(0x20)180 self.ReadBusy()181 # self.send_command(0x3C) #BorderWavefrom182 # self.send_data(0x01)183 return 0184 def getbuffer(self, image):185 if self.width%8 == 0:186 linewidth = int(self.width/8)187 else:188 linewidth = int(self.width/8) + 1189 190 buf = [0xFF] * (linewidth * self.height)191 image_monocolor = image.convert('1')192 imwidth, imheight = image_monocolor.size193 pixels = image_monocolor.load()194 195 if(imwidth == self.width and imheight == self.height):196 logging.debug("Vertical")197 for y in range(imheight):198 for x in range(imwidth): 199 if pixels[x, y] == 0:200 x = imwidth - x201 buf[int(x / 8) + y * linewidth] &= ~(0x80 >> (x % 8))202 elif(imwidth == self.height and imheight == self.width):203 logging.debug("Horizontal")204 for y in range(imheight):205 for x in range(imwidth):206 newx = y207 newy = self.height - x - 1208 if pixels[x, y] == 0:209 newy = imwidth - newy - 1210 buf[int(newx / 8) + newy*linewidth] &= ~(0x80 >> (y % 8))211 return buf 212 213 214 def display(self, image):215 if self.width%8 == 0:216 linewidth = int(self.width/8)217 else:218 linewidth = int(self.width/8) + 1219 self.send_command(0x24)220 for j in range(0, self.height):221 for i in range(0, linewidth):222 self.send_data(image[i + j * linewidth]) 223 self.TurnOnDisplay()224 225 def displayPartial(self, image):226 if self.width%8 == 0:227 linewidth = int(self.width/8)228 else:229 linewidth = int(self.width/8) + 1230 self.send_command(0x24)231 for j in range(0, self.height):232 for i in range(0, linewidth):233 self.send_data(image[i + j * linewidth]) 234 235 236 # self.send_command(0x26)237 # for j in range(0, self.height):238 # for i in range(0, linewidth):239 # self.send_data(~image[i + j * linewidth]) 240 self.TurnOnDisplayPart()241 def displayPartBaseImage(self, image):242 if self.width%8 == 0:243 linewidth = int(self.width/8)244 else:245 linewidth = int(self.width/8) + 1246 self.send_command(0x24)247 for j in range(0, self.height):248 for i in range(0, linewidth):249 self.send_data(image[i + j * linewidth]) 250 251 252 self.send_command(0x26)253 for j in range(0, self.height):254 for i in range(0, linewidth):255 self.send_data(image[i + j * linewidth]) 256 self.TurnOnDisplay()257 258 def Clear(self, color):259 if self.width%8 == 0:260 linewidth = int(self.width/8)261 else:262 linewidth = int(self.width/8) + 1263 # logging.debug(linewidth)264 265 self.send_command(0x24)266 for j in range(0, self.height):267 for i in range(0, linewidth):268 self.send_data(color) 269 self.TurnOnDisplay()270 def sleep(self):271 # self.send_command(0x22) #POWER OFF272 # self.send_data(0xC3)273 # self.send_command(0x20)274 self.send_command(0x10) #enter deep sleep275 self.send_data(0x01)276 epdconfig.delay_ms(100)277 epdconfig.module_exit()278### END OF FILE ###279# if __name__ == '__main__':280# epd_t = EPD()281# epd_t.init(epd_t.PART_UPDATE)282# a = 0xff283# for i in range(0,9):284# print(i)285# if i % 2 == 0:286# a = 0x0287# else:288# a = 0xff...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful