Best Python code snippet using pyatom_python
MonoLinkClient.py
Source:MonoLinkClient.py  
1# by amounra 0513 : http://www.aumhaa.com23from __future__ import with_statement4import Live5import sys6import contextlib78from _Framework.SubjectSlot import SubjectEvent9from _Framework.Signal import Signal10from _Framework.NotifyingControlElement import NotifyingControlElement11from _Framework.Util import in_range12from _Framework.Debug import debug_print13from _Framework.Disconnectable import Disconnectable1415import modRemixNet as RemixNet16import modOSC17from modZeroconf import *18import modsocket as socket1920from _Mono_Framework.MonoClient import MonoClient21from _Mono_Framework.LiveUtils import *2223from MonoLink_Map import *242526MODES = ['SerialOSC', 'MonomeSerial']27MSG = ['/grid/key', '/press']28QUADS = [[0, 0], [8, 0], [0, 8], [8, 8]]29303132class MonoLinkClient(MonoClient):333435	def __init__(self, *a, **k):36		super(MonoLinkClient, self).__init__(*a, **k)3738		#monolink specific 39		self._prefix = '/MonoLink'40		self.basicAPI = False41		self.conf = None42		self._inPrt = int(PRESETS[0][2][0])43		self._outPrt = int(PRESETS[0][2][1])44		self._inst = 045		46		try:	47			self.conf = Zeroconf(self)48		except:49			self._host.log_message('Could not create Bonjour Registerer...something must be blocking this port or address') 50		51		self._is_monolink = True52		self._format = 053		self.oscServer = None54		self._setup_oscServer()5556	5758	def _setup_oscServer(self):59		if not self.oscServer is None:60			self.oscServer.shutdown()	61		self.basicAPI = False62		self.oscServer = RemixNet.OSCServer('localhost', self._outPrt, 'localhost', self._inPrt)63		self._inPrt = int(self.oscServer.srcPort)	##doing this in case the port wasn't available and had to be advanced by the oscServer64		self.oscServer.sendOSC('/MonoLink', 'startup')65	6667	def _banner(self):68		pass69	7071	def disconnect(self):72		super(MonoLinkClient, self).disconnect()73		if not self.oscServer is None:74			self.oscServer.shutdown()75		self.oscServer = None76		self.unregister()77	7879	def reset(self):80		pass81	8283	def _connect_to(self, device=None):84		self._connected = True85		self._host._refresh_stored_data()86	8788	def _disconnect_client(self):89		#self._host.log_message('disconnect' + str(self._number))90		self._create_grid()91		self._create_keys()92		self._swing = 093		self._report_offset = False94		self._connected = False95		self._device = None96		for host in self._active_host:97			host.update()98	99100	def _device_listener(self):101		pass102	103104	def _send_offset(self, x, y):105		pass106	107108	def _send_key(self, index, value):109		pass110	111112	def _send_grid(self, column, row, value):113		self._send(MSG[self._format], [column, row, value])114	115116	def _send(self, addr = None, val = None, args3 = None, args4 = None):117		#self._host.log_message('_send' + str(addr) + str(val))118		if (self._enabled is True) and (self.oscServer != None):119			#self._host.log_message('really _send' + str(self._prefix) + str(addr) + ' ' + str(val))120			self.oscServer.sendOSC(str(self._prefix)+addr, val)121	122123	def receive_autoselect_enabled(self, val):124		pass125	126127	#MonoLinkComponent specific calls 128129	def call_network_functions(self):130		#self._host.log_message('call_net_func')		  131		if self.basicAPI is False:132			#self._host.log_message('basicAPI is False')		133			try:134				doc = self._host.song()135			except:136				return137			try:138				#self._host.log_message('trying to assign callbacks')139				self.basicAPI = self._assign_sys_callbacks()140				# Commented for stability141				#doc.add_current_song_time_listener(self.oscServer.processIncomingUDP)142				self.oscServer.sendOSC('/MonoLink', ['basicAPI:', int(self.basicAPI)])143				self.register()144				self._assign_callbacks(self._prefix)145				self.set_enabled(1)146			except:147				return148		if self.is_active() and self.oscServer:149			try:150				self.oscServer.processIncomingUDP()151			except:152				pass153	154155	def _conf_info(self):156		desc = {'type':'_monome-osc._udp','domain':'local'}157		return ServiceInfo("_monome-osc._udp.local.", "monolink "+str(self._inst)+"._monome-osc._udp.local.", socket.inet_aton("127.0.0.1"), self._inPrt, 0, 0, desc, "localhost")158	159	160161	def register(self):162		if self.conf:163			#self._host.log_message('registering service info for port ' + str(self._inPrt))164			try:165				self.conf.registerService([self._conf_info(), 0], 20)166			except:167				self._host.log_message('cannot register current service info')168	169170	def unregister(self):171		if self.conf:172			#self._host.log_message('unregistering service for port ' + str(self._inPrt))173			try:174				self.conf.unregisterService([self._conf_info(), 0])175			except:176				self._host.log_message('cannot unregister current service info')177			self._inst += 1178	179180	def _assign_sys_callbacks(self):181		#self._host.log_message('assign_sys_callbacks')182		if self.oscServer:183			#self._host.log_message('deleting sys callbacks')184			self.oscServer.resetCallbacks()185		else:186			return False187		if self._format is 0:188			self.oscServer.addCallback(self.sysEchoCB, "/echo")189			self.oscServer.addCallback(self.sysInfoCB, "/sys/info")190			self.oscServer.addCallback(self.sysPortCB, "/sys/port")191			self.oscServer.addCallback(self.sysHostCB, "/sys/host")192			self.oscServer.addCallback(self.sysIdCB, "/sys/id")193			self.oscServer.addCallback(self.sysPrefixCB, "/sys/prefix")194			self._assign_callbacks(self._prefix)195			#self._host.log_message('serialosc sys callbacks assigned, returning true')196			return True197		elif self._format is 1:198			self.oscServer.addCallback(self.sysEchoCB, "/echo")199			self.oscServer.addCallback(self.sysOffsetCB, "/sys/offset")200			self.oscServer.addCallback(self.sysPrefixCB, "/sys/prefix")201			self.oscServer.addCallback(self.sysReportCB, "/sys/report")202			#self._host.log_message('monomeserial sys callbacks assigned, returning true')203			self._assign_callbacks(self._prefix)204			return True205		else:206			return False207	208209	def sysEchoCB(self, msg=None):210		self.oscServer.sendOSC("/MonoLink/echo", msg)211	212213	def sysInfoCB(self, msg=None):214		self._host.log_message('infoCB' + str(self._outPrt) + str(self._prefix))215		self._host.schedule_message(30, self._info_return)216	217218	def _info_return(self):219		if(self.oscServer):220			if self._format is 0:221				self._host.log_message('info return serialosc')222				self.oscServer.sendOSC("/sys/port", self._outPrt)223				self.oscServer.sendOSC("/sys/prefix", self._prefix)224				self.oscServer.sendOSC("/sys/id", self._number)225			elif self._format is 1:226				self._host.log_message('info return monomeserial')227				self.oscServer.sendOSC("/sys/devices", 1)228				self.oscServer.sendOSC("/sys/prefix", [0, self._prefix])229				self.oscServer.sendOSC("/sys/type", [0, 256])230				self.oscServer.sendOSC("/sys/cable", [0, 'up'])231				self.oscServer.sendOSC("/sys/offset", [0, 0, 0])232		self._host.log_message('info return completed')233	234235	def sysPortCB(self, msg=None):236		self._host.log_message('sysPortCB' + str(msg))237		if msg[2] != None:238			self._host.log_message('old outport ' + str(self._outPrt))239			if	self._outPrt != int(msg[2]):240				self._outPrt = int(msg[2])241				self.oscServer.set_outPort(self._outPrt)242				self._host.log_message('changed')243	244245	def sysHostCB(self, msg=None):246		self._host.log_message('sysHostCB' + str(msg))247	248249	def sysIdCB(self, msg=None):250		self._host.log_message('sysIdCB' + str(msg))251	252253	def sysPrefixCB(self, msg=None):254		self._host.log_message('sysPrefix' + str(msg))255		self._prefix = str(msg[2])256		self._assign_callbacks(msg[2])257		self._host.schedule_message(30, self._info_return)258	259260	def sysOffsetCB(self, msg=None):261		pass262	263264	def sysReportCB(self, msg=None):265		pass266	267268	def _assign_callbacks(self, prefix):269		#self._host.log_message('assigning callbacks for prefix')270		if self._prefix != prefix:271			self.oscServer.removeCallback(str(self._prefix+'/grid/led/set'))272			self.oscServer.removeCallback(str(self._prefix+'/grid/led/all'))273			self.oscServer.removeCallback(str(self._prefix+'/grid/led/map'))274			self.oscServer.removeCallback(str(self._prefix+'/grid/led/row'))275			self.oscServer.removeCallback(str(self._prefix+'/grid/led/col'))276			self.oscServer.removeCallback(str(self._prefix+'/led'))277			self.oscServer.removeCallback(str(self._prefix+'/clear'))278			self.oscServer.removeCallback(str(self._prefix+'/frame'))279			self.oscServer.removeCallback(str(self._prefix+'/led_row'))280			self.oscServer.removeCallback(str(self._prefix+'/led_col'))281			self.oscServer.removeCallback(str(self._prefix+'/grid/led/intensity'))282			self.oscServer.removeCallback(str(self._prefix+'/grid/led/level/set'))283			self.oscServer.removeCallback(str(self._prefix+'/grid/led/level/all'))284			self.oscServer.removeCallback(str(self._prefix+'/grid/led/level/map'))285			self.oscServer.removeCallback(str(self._prefix+'/grid/led/level/row'))286			self.oscServer.removeCallback(str(self._prefix+'/grid/led/level/col'))287			#self._host.log_message('prefixed callbacks deleted')288		if prefix != None:289			self._prefix = prefix290			if self._format is 0:291				self.oscServer.addCallback(self.ledSet, str(self._prefix+'/grid/led/set'))292				self.oscServer.addCallback(self.ledAll, str(self._prefix+'/grid/led/all'))293				self.oscServer.addCallback(self.ledMap, str(self._prefix+'/grid/led/map'))294				self.oscServer.addCallback(self.ledRow, str(self._prefix+'/grid/led/row'))295				self.oscServer.addCallback(self.ledCol, str(self._prefix+'/grid/led/col'))296				self.oscServer.addCallback(self.ledIntensity, str(self._prefix+'/grid/led/intensity'))297				self.oscServer.addCallback(self.ledLvlSet, str(self._prefix+'/grid/led/level/set'))298				self.oscServer.addCallback(self.ledLvlAll, str(self._prefix+'/grid/led/level/all'))299				self.oscServer.addCallback(self.ledLvlMap, str(self._prefix+'/grid/led/level/map'))300				self.oscServer.addCallback(self.ledLvlRow, str(self._prefix+'/grid/led/level/row'))301				self.oscServer.addCallback(self.ledLvlCol, str(self._prefix+'/grid/led/level/col'))302			elif self._format is 1:303				self.oscServer.addCallback(self.ledSet, str(self._prefix+'/led'))304				self.oscServer.addCallback(self.ledClear, str(self._prefix+'/clear'))305				self.oscServer.addCallback(self.ledMap, str(self._prefix+'/frame'))306				self.oscServer.addCallback(self.led_row, str(self._prefix+'/led_row'))307				self.oscServer.addCallback(self.led_col, str(self._prefix+'/led_col'))308		#self._host.log_message('prefixed callbacks assigned')309	310311	def ledSet(self, msg=None):312		#self._script.log_message('_ledSet' + str(msg[2]) + str(msg[3]) + str(msg[4]))313		self.receive_grid(msg[2], msg[3], msg[4])314	315316	def ledAll(self, msg=None):317		self.receive_grid_all(msg[2])318	319320	def ledRow(self, msg=None):321		if len(msg) > 3:322			xOff = int(msg[2])323			if xOff%8 is 0:324				yOff = int(msg[3])325				rows = msg[4:]326				for quad in range(len(rows)):327					rowOut = self.int2bin(rows[quad])328					for cell in range(8):329						#self._host.log_message(str(xOff+index+QUADS[quad][0]) + str(yOff+cell+QUADS[quad][1]) + str(rowOut[index]))330						self.receive_grid(xOff+QUADS[quad][0], yOff+cell+QUADS[quad][1], rowOut[cell])331	332333	def ledCol(self, msg=None):334		if len(msg) > 3:335			yOff = int(msg[3])336			if yOff%8 is 0:337				xOff = int(msg[2])338				cols = msg[4:]339				for quad in range(len(cols)):340					colOut = self.int2bin(cols[quad])341					for cell in range(8):342						#self._host.log_message(str(xOff+index+QUADS[quad][0]) + str(yOff+cell+QUADS[quad][1]) + str(rowOut[index]))343						self.receive_grid(xOff+cell+QUADS[quad][0], yOff+QUADS[quad][1], colOut[cell])344	345346	def ledMap(self, msg=None):347		xOff = int(msg[2])348		yOff = int(msg[3])349		map = msg[4:]350		if len(map) is 8:351			for row in range(8):352				rowOut = self.int2bin(row)353				for cell in range(8):354					self.receive_grid(x_Off+cell, yOff+row, rowOut[cell])355	356357	def ledIntensity(self, msg=None):358		pass359	360361	def ledLvlSet(self, msg=None):362		pass363	364365	def ledLvlAll(self, msg=None):366		pass367	368369	def ledLvlMap(self, msg=None):370		pass371	372373	def ledLvlRow(self, msg=None):374		pass375	376377	def ledLvlCol(self, msg=None):378		pass379	380381	def ledClear(self, msg=None):382		self.receive_grid_all(0)383	384385	def led_col(self, msg=None):386		if len(msg) > 2:387			xOff = int(msg[2])388			cols = msg[3:]389			for quad in range(len(cols)):390				colOut = self.int2bin(cols[quad])391				for cell in range(8):392					self.receive_grid(xOff+QUADS[quad][0], cell+QUADS[quad][1], colOut[cell])393394	395396	def led_row(self, msg=None):397		if len(msg) > 2:398			yOff = int(msg[2])399			rows = msg[3:]400			for quad in range(len(rows)):401				rowOut = self.int2bin(rows[quad])402				for cell in range(8):403					self.receive_grid(cell+QUADS[quad][0], yOff+QUADS[quad][1], rowOut[cell])404405	406407	def _display_info(self, inPrt, outPrt):408		self._host.show_message('Preset: ' + str(PRESETS[self._channel][3]) + ' In: ' + str(inPrt) + ' Out: ' + str(outPrt) + ' Format: ' + str(MODES[self._format]) + ' Pre: ' + str(self._prefix))409	410411	def _change_ports(self, ports):412		self._host.log_message('ports ' + str(ports))413		inPrt = self._inPrt414		outPrt = self._outPrt415		if len(ports[0]) > 0:416			str_inPrt = ''.join(ports[0]).strip('\'')417			inPrt = str_inPrt418		inPrt = int(inPrt)419		if len(ports[1]) > 0:420			str_outPrt = ''.join(ports[1]).strip('\'')421			outPrt = str_outPrt422		outPrt = int(outPrt)423		if inPrt in range(0,65535) and outPrt in range(0,65535):424			if not (self._outPrt == outPrt):425				self._outPrt = outPrt426				PRESETS[self._channel][2][1] = str(self._outPrt)427			if not (self._inPrt == inPrt):428				self._inPrt = inPrt429				self.unregister()430				PRESETS[self._channel][2][0] = str(self._inPrt)431				self._host.schedule_message(15, self._setup_oscServer)432			else:433				self.oscServer.set_outPort(self._outPrt)434	435436	def _change_modes(self, value):437		self._format = value438		self._assign_sys_callbacks()439	440441	def int2bin(self, n):442		return [((n >> y) & 1) for y in range(8)]443	444445446#self.device = self._host.song().view.selected_track.view.selected_device
...__init__.py
Source:__init__.py  
...9        - sync_mode:  fixed master-slave, dynamic master-slave,10            queued master-master11        - get, set(value)12        - apply(operator, value) -- sum 3, mult 1.5, set 6 ...13        - addcallback(fn), removecallback(fn)14    * TupleSync: Tuple synced over network15        - sync_mode:  fixed master-slave, dynamic master-slave,16            queued master-master17        - get, set(value)18        - addcallback(fn), removecallback(fn)19    * StringSync: (small) String synced over network20        - sync_mode:  fixed master-slave, dynamic master-slave,21            queued master-master22        - get, set(value)23        - addcallback(fn), removecallback(fn)24    * DictSync: dict of elements synced over network25        - sync_mode:  fixed master-slave, dynamic master-slave,26            queued master-master27        - get(x), set(x,value)28        - apply(x, operator, value)29        - addcallback(fn), removecallback(fn)30    * ListSync: list of elements synced over network.31        - sync_mode:  fixed master-slave, dynamic master-slave,32            queued master-master33        - primary_key: index, full-item, dict-column, list-index34        - item_style: single-object, list, dict35        - clear36        - append(x),push(x)37        - remove(x),pop(x)38        - update(x,value)39        - addcallback(fn), removecallback(fn)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
