Best Python code snippet using fMBT_python
DRYP_groundwater_EFD.py
Source:DRYP_groundwater_EFD.py  
1import os2import numpy as np3from landlab import RasterModelGrid4from landlab.grid.mappers import (5	map_mean_of_link_nodes_to_link,6	map_max_of_node_links_to_node,7	map_max_of_link_nodes_to_link,8	map_min_of_link_nodes_to_link)9from landlab.io import read_esri_ascii10#Global variables11REG_FACTOR = 0.001 #Regularisation factor12COURANT_2D = 0.25 # Courant Number 2D flow13COURANT_1D = 0.50 # Courant number 1D flow14STR_RIVER = 0.001 # Riverbed storage factor15# provisional16a_faq = 15017b_faq = 13118class gwflow_EFD(object):19		20	def __init__(self, env_state, data_in):21	22		env_state.SZgrid.add_zeros('node', 'recharge', dtype=float)		23		env_state.SZgrid.add_zeros('node', 'discharge', dtype=float)		24		env_state.SZgrid.add_zeros('node', 'river_stage__elevation', dtype=float)		25		env_state.SZgrid.add_zeros('node', 'water_storage_anomaly', dtype=float)26		27		env_state.SZgrid.at_node['topographic__elevation'] = np.array(env_state.grid.at_node['topographic__elevation'])		28					29		act_links = env_state.SZgrid.active_links30				31		Kmax = map_max_of_link_nodes_to_link(env_state.SZgrid, 'Hydraulic_Conductivity')		32		Kmin = map_min_of_link_nodes_to_link(env_state.SZgrid, 'Hydraulic_Conductivity')		33		Ksl = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'Hydraulic_Conductivity')34		35		self.Ksat = np.zeros(len(Ksl))		36		self.Ksat[act_links] = Kmax[act_links]*Kmin[act_links]/Ksl[act_links]37		38		self.hriv = np.array(env_state.SZgrid.at_node['water_table__elevation'])		39		self.wte_dt = np.array(env_state.SZgrid.at_node['water_table__elevation'])40		self.faq = np.ones_like(Ksl)41		self.faq_node = np.ones_like(self.hriv)42				43		if env_state.func == 1:44			print('Transmissivity function for variable WT depth')45		elif env_state.func == 2:46			print('Constant transmissivity')47		else:48			print('Unconfined conditions: variable thickness')49		print('Change approach in setting_file: line 26')50		51		52		if env_state.func == 1 or env_state.func == 2:53			dzdl = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['topographic__elevation'])54			a_faq = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'SZ_a_aq')55			b_faq = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'SZ_b_aq')56			self.faq = a_faq/(1+b_faq*np.abs(dzdl))57			self.faq_node = map_max_of_node_links_to_node(env_state.SZgrid, self.faq)58		self.zm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'topographic__elevation')59		self.flux_out = 060		self.act_fix_link = 061		62		A = np.power(env_state.SZgrid.dx,2)		63		Ariv = (env_state.grid.at_node['river_width']64				* env_state.grid.at_node['river_length'])65		self.W =  Ariv / env_state.SZgrid.dx	66		kriv = Ariv67		kriv[Ariv != 0] = 1/Ariv[Ariv != 0]68		self.kriv = kriv69		self.kaq = 1/A70		self.kAriv = Ariv*self.kaq71		self.f = 3072		self.dh = np.zeros_like(Ariv)73		74		if len(env_state.SZgrid.open_boundary_nodes) > 0:			75			self.fixed_links = env_state.SZgrid.links_at_node[76				env_state.SZgrid.open_boundary_nodes]77			self.act_fix_link = 178		79		80	def add_second_layer_gw(self, env_state, thickness, Ksat, Sy, Ss):	81		# thickness:	Thickness of the deep aquifer82		# Ksat:			Saturated hydraulic conductivity of the deep aquifer83		# Sy:			Specific yield of the second layer84		# Ss:			Specific storage of the second layer	85		env_state.SZgrid.add_zeros('node', 'Ksat_2', dtype=float)		86		env_state.SZgrid.add_zeros('node', 'BOT_2', dtype=float)		87		env_state.SZgrid.add_zeros('node', 'Sy_2', dtype=float)		88		env_state.SZgrid.add_zeros('node', 'Ss_2', dtype=float)89		env_state.SZgrid.add_zeros('node', 'HEAD_2', dtype=float)90	91		env_state.SZgrid.at_node['BOT_2'] = np.array(92			env_state.SZgrid.at_node['BOT']93			+ thickness94			)95		96		self.thickness = thickness97		98		env_state.SZgrid.at_node['Sy_2'][:] = Sy		99		env_state.SZgrid.at_node['Ss_2'][:] = Ss		100		env_state.SZgrid.at_node['Ksat_2'][:] = Ksat101		102		act_links = env_state.SZgrid.active_links		103		Kmax = map_max_of_link_nodes_to_link(env_state.SZgrid, 'Ksat_2')		104		Kmin = map_min_of_link_nodes_to_link(env_state.SZgrid, 'Ksat_2')		105		Ksl = map_mean_of_link_nodes_to_link(env_state.SZgrid, 'Ksat_2')106		107		self.Ksat_2 = np.zeros(len(Ksl))		108		self.Ksat_2[act_links] = Kmax[act_links]*Kmin[act_links]/Ksl[act_links]		109		env_state.SZgrid.at_node['HEAD_2'][:] = np.array(env_state.SZgrid.at_node['water_table__elevation'])		110	def run_one_step_gw_2Layer(self, env_state, dt, tht_dt, rtht_dt, Droot):	111				112		"""113		Function to update water table depending on the unsaturated zone.114		Parameters:115		Droot:		Rooting depth [mm]116		tht_dt:		Water content at time t [-]117		Duz:		Unsaturated zone depth118		env_state:	grid:	z:	Topograhic elevation119							h:	water table120							fs:	Saturated water content121							fc:	Field capacity122							Sy:	Specific yield123							dq:	water storage anomaly124					125		Groundwater storage variation126		"""127		dts = time_step(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],128						env_state.SZgrid.at_node['Hydraulic_Conductivity'],129						env_state.SZgrid.at_node['water_table__elevation'],130						env_state.SZgrid.at_node['BOT'],131						env_state.SZgrid.dx,132						env_state.SZgrid.core_nodes)133		134		act_links = env_state.SZgrid.active_links135		A = np.power(env_state.SZgrid.dx,2)136		dtp = np.nanmin([dt, dts])137		dtsp = dtp138		env_state.SZgrid.at_node['discharge'][:] = 0.0139		140		while dtp <= dt:141			142			## specifiying river flow boundary conditions for groundwater flow143			env_state.SZgrid.at_node['water_table__elevation'][:] = np.maximum(144				env_state.SZgrid.at_node['water_table__elevation'],145				env_state.SZgrid.at_node['BOT']146				)147			## specifiying river flow boundary conditions for groundwater flow148			env_state.SZgrid.at_node['water_table__elevation'][:] = np.minimum(149				env_state.SZgrid.at_node['topographic__elevation'],150				env_state.SZgrid.at_node['water_table__elevation']151				)				152			# Calculate vertical conductivity for river cells153			p = np.where((env_state.SZgrid.at_node['HEAD_2']154				-env_state.SZgrid.at_node['BOT_2']) > 0,1,0)155			156			dh_1 = (env_state.SZgrid.at_node['water_table__elevation']157				- env_state.SZgrid.at_node['BOT_2']158				)*p159						160			dh_2 = (env_state.SZgrid.at_node['HEAD_2']161				- env_state.SZgrid.at_node['BOT']162				)*(1-p)163						164			Cz = (np.where(dh_1 > 0,1/(env_state.SZgrid.at_node['Hydraulic_Conductivity']165				/(0.5*dh_1)),0) + 1/(env_state.SZgrid.at_node['Ksat_2']166				/(0.5*self.thickness)))167		168			aux_h = np.where((env_state.SZgrid.at_node['HEAD_2']169				-env_state.SZgrid.at_node['BOT_2']) > 0, env_state.SZgrid.at_node['water_table__elevation'],170				env_state.SZgrid.at_node['HEAD_2'])171			172			dhdz = np.where((dh_2+dh_1) > 0,173				(aux_h - env_state.SZgrid.at_node['HEAD_2'])/np.power(self.thickness+dh_1,2),0)174			175			dqz = - (1/Cz)*dhdz176			177			# Calculate the hydraulic gradients178			dhdl_1 = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['water_table__elevation'])179			dhdl_2 = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['HEAD_2'])180			181			# mean hydraulic head at the face of the of the patch182			qxy_1 = np.zeros(len(self.Ksat))183			qxy_2 = np.zeros(len(self.Ksat_2))184			185			hm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'water_table__elevation')186			bm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'BOT_2')187						188			# Calculate flux per unit length at each face layer 1189			qxy_1[act_links] = -self.Ksat[act_links]*(hm[act_links]-bm[act_links])*dhdl_1[act_links]190			191			# Calculate flux per unit length at each face layer 2192			hm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'HEAD_2')193			bm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'BOT')194			195			b2 = np.minimum(hm-bm, Self.thickness)			196			qxy_2[act_links] = -self.Ksat_2[act_links]*b2[act_links]*dhdl_2[act_links]197					198			# Calculate the flux gradient199			dqxy_1 = (-env_state.SZgrid.calc_flux_div_at_node(qxy_1) + dqz + p*env_state.SZgrid.at_node['recharge']/dt)#*dtsp200			dqxy_2 = (-env_state.SZgrid.calc_flux_div_at_node(qxy_2) - dqz + (1-p)*env_state.SZgrid.at_node['recharge']/dt)#*dtsp201						202			CRIV =  smoth_func(env_state.SZgrid.at_node['water_table__elevation'],203					env_state.grid.at_node['river_topo_elevation'],0.00001,204					dqxy_1, env_state.SZgrid.core_nodes)*(1/A)*env_state.grid.at_node['SS_loss']205			206			FSy, FSs = smoth_func_L2(env_state.SZgrid.at_node['HEAD_2'][:],207				env_state.SZgrid.at_node['BOT'][:],208				self.thickness, REG_FACTOR, env_state.SZgrid.core_nodes)209			210			alpha = 1/(env_state.SZgrid.at_node['SZ_Sy'] + dtsp*CRIV)211			212			# Update water table upper layer213			env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes] = (dtsp*alpha[env_state.SZgrid.core_nodes]214				*((p*dqxy_1)[env_state.SZgrid.core_nodes] + CRIV[env_state.SZgrid.core_nodes]*env_state.grid.at_node['river_topo_elevation'][env_state.SZgrid.core_nodes])215				+ env_state.SZgrid.at_node['SZ_Sy'][env_state.SZgrid.core_nodes]*alpha[env_state.SZgrid.core_nodes]216				*env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes])217			218			# Update water table lower layer219			env_state.SZgrid.at_node['HEAD_2'][env_state.SZgrid.core_nodes] += (dqxy_2[env_state.SZgrid.core_nodes]*dtsp*220				(FSy[env_state.SZgrid.core_nodes]/env_state.SZgrid.at_node['Sy_2'][env_state.SZgrid.core_nodes]+221				FSs[env_state.SZgrid.core_nodes]/env_state.SZgrid.at_node['Ss_2'][env_state.SZgrid.core_nodes])222				)223			224			env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes] = np.maximum(225				env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes],226				env_state.SZgrid.at_node['BOT_2'][env_state.SZgrid.core_nodes])227			228			# Update water storage anomalies229			env_state.grid.at_node['Base_flow'][env_state.SZgrid.core_nodes] += (230				CRIV[env_state.SZgrid.core_nodes]231				*np.abs(env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes]232				-env_state.grid.at_node['river_topo_elevation'][env_state.SZgrid.core_nodes])233				)234						235			dtsp = time_step(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],236						env_state.SZgrid.at_node['Hydraulic_Conductivity'],237						env_state.SZgrid.at_node['water_table__elevation'],238						env_state.SZgrid.at_node['BOT'],239						env_state.SZgrid.dx,240						env_state.SZgrid.core_nodes241						)242			243			if dtsp <= 0:244				raise Exception("invalid time step", dtsp)245			if dtp == dt:246				dtp += dtsp247			elif (dtp + dtsp) > dt:248				dtsp = dt - dtp249				dtp += dtsp250			else:251				dtp += dtsp252			253		self.wte_dt = np.array(env_state.SZgrid.at_node['water_table__elevation'])254		255		env_state.SZgrid.at_node['discharge'][:] *= (1/dt)256		257		env_state.grid.at_node['riv_sat_deficit'][:] = np.power(env_state.grid.dx,2)\258				*np.array(env_state.grid.at_node['topographic__elevation']-\259				env_state.SZgrid.at_node['water_table__elevation'][:])260		261		pass262	263	def run_one_step_gw(self, env_state, dt, tht_dt, Droot):264		"""Function to update water table depending on the unsaturated zone.265		Parameters:266		Droot:		Rooting depth [mm]267		tht_dt:		Water content at time t [-]268		Duz:		Unsaturated zone depth269		env_state:	grid:	z:	Topograhic elevation270							h:	water table271							fs:	Saturated water content272							fc:	Field capacity273							Sy:	Specific yield274							dq:	water storage anomaly275							f:	effective aquifer depth276					277		Groundwater storage variation278		"""279		act_links = env_state.SZgrid.active_links280		281		T = transmissivity(env_state, self.Ksat, act_links, self.faq, self.zm)282		283		dts = time_step_confined(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],284			map_max_of_node_links_to_node(env_state.SZgrid, T),285			env_state.SZgrid.dx, env_state.SZgrid.core_nodes286			)287	288		stage = env_state.grid.at_node['Q_ini'] * self.kriv289		290		aux_riv = np.ones(len(stage))291		292		aux_riv[stage > 0] = 0293		294		Tch = exponential_T(env_state.grid.at_node['SS_loss'], STR_RIVER,295				env_state.grid.at_node['river_topo_elevation'], self.hriv)296		297		dts_riv = time_step_confined(COURANT_1D, env_state.SZgrid.at_node['SZ_Sy'],298						Tch/self.W, 0.25*(env_state.SZgrid.dx - self.W),299						env_state.riv_nodes)300		301		#dtp = np.nanmin([dt, dts, dts_riv])302		dtp = np.nanmin([dt, dts])303		304		dtsp = dtp305		306		env_state.SZgrid.at_node['discharge'][:] = 0.0307		self.flux_out = 0308		self.dh[:] = 0309		310		while dtp <= dt:311			312			# Make water table always greater or equal to bottom elevation313			env_state.SZgrid.at_node['water_table__elevation'][:] = np.minimum(314				env_state.SZgrid.at_node['topographic__elevation'],315				env_state.SZgrid.at_node['water_table__elevation']316				)317						318			# Make water table always above the bottom elevation319			if env_state.func == 2:320				env_state.SZgrid.at_node['water_table__elevation'][:] = np.maximum(321					env_state.SZgrid.at_node['water_table__elevation'],322					env_state.SZgrid.at_node['BOT']323					)324			325			# Make river water table always below or equal surface elevation326			self.hriv = np.minimum(self.hriv,327				env_state.grid.at_node['river_topo_elevation']328				)329			330			# Calculate transmissivity331			T = transmissivity(env_state, self.Ksat, act_links, self.faq, self.zm)332						333			# Calculate the hydraulic gradients			334			dhdl = env_state.SZgrid.calc_grad_at_link(env_state.SZgrid.at_node['water_table__elevation'])335			336			# Calculate flux per unit length at each face337			qs = np.zeros(len(self.Ksat))338			qs[act_links] = -T[act_links]*dhdl[act_links]339			340			if self.act_fix_link == 1:341				self.flux_out += (np.sum(qs[self.fixed_links])/env_state.SZgrid.dx)*dtsp342				343			# Calculate flux head boundary conditions344			dfhbc = exponential_T(env_state.SZgrid.at_node['SZ_FHB'], 60,345				env_state.SZgrid.at_node['topographic__elevation'],346				env_state.SZgrid.at_node['water_table__elevation']347				)348			self.flux_out += np.sum(dfhbc)349			350			# Calculate flux gradient351			dqsdxy = (-env_state.SZgrid.calc_flux_div_at_node(qs)352					- dfhbc + env_state.SZgrid.at_node['recharge']/dt)353			354			# Calculate river cell flux355			Tch = exponential_T(env_state.grid.at_node['SS_loss'], STR_RIVER,356				env_state.grid.at_node['river_topo_elevation'], self.hriv)357			diff_stage = (env_state.SZgrid.at_node['water_table__elevation']358				- self.hriv)359			360			stage_aux = np.array(stage)361			stage_aux[diff_stage < 0] = 0362			363			qs_riv = -(Tch*(diff_stage-stage_aux)*50 / (env_state.SZgrid.dx - self.W))364			qs_riv[qs_riv < 0] = qs_riv[qs_riv < 0]*aux_riv[qs_riv < 0]365			366			dqsdxy += self.kaq*qs_riv367			368			# Regularization approach for aquifer cells369			if env_state.func == 1 or  env_state.func == 2:370				dqs = regularization_T(371					env_state.SZgrid.at_node['topographic__elevation'],372					env_state.SZgrid.at_node['water_table__elevation'],373					self.faq_node, dqsdxy, REG_FACTOR374					)375			376			else:377				dqs = regularization(378					env_state.SZgrid.at_node['topographic__elevation'],379					env_state.SZgrid.at_node['water_table__elevation'],380					env_state.SZgrid.at_node['BOT'],381					dqsdxy, REG_FACTOR)382					383			# Regularization approach for river cells384			dqs_riv = regularization_T(385				env_state.grid.at_node['river_topo_elevation'],386				self.hriv, 	self.f, -self.kriv*qs_riv, REG_FACTOR387				)388			389			# Update the head elevations390			env_state.SZgrid.at_node['water_table__elevation'] += ((dqsdxy-dqs)391				* dtsp / env_state.SZgrid.at_node['SZ_Sy'])392						393			self.hriv += (-self.kriv*qs_riv - dqs_riv)*dtsp/env_state.SZgrid.at_node['SZ_Sy']394			395			# Update storage change for soil-gw interactions396			env_state.SZgrid.at_node['water_storage_anomaly'][:] = (dqsdxy-dqs) *dtsp			397			fun_update_UZ_SZ_depth(env_state, tht_dt, Droot)398			399			# Calculate total discharge400			env_state.SZgrid.at_node['discharge'][:] += (dqs + dqs_riv*self.kAriv)*dtsp401			402			# Calculate maximum time step403			dtsp = time_step_confined(COURANT_2D, env_state.SZgrid.at_node['SZ_Sy'],404				map_max_of_node_links_to_node(env_state.SZgrid, T),405				env_state.SZgrid.dx, env_state.SZgrid.core_nodes406				)407			408			dtsp_riv = time_step_confined(COURANT_1D, env_state.SZgrid.at_node['SZ_Sy'],409						Tch/self.W, 0.02*(env_state.SZgrid.dx - self.W), env_state.riv_nodes)410			411			#dtsp = np.min([dtsp, dtsp_riv])			412			413			# Update time step414			if dtsp <= 0:415				raise Exception("invalid time step", dtsp)			416			if dtp == dt:			417				dtp += dtsp			418			elif (dtp + dtsp) > dt:			419				dtsp = dt - dtp				420				dtp += dtsp				421			else:			422				dtp += dtsp423		424		# Update state variables425		self.dh = np.array(env_state.SZgrid.at_node['water_table__elevation'])-self.wte_dt426		self.wte_dt = np.array(env_state.SZgrid.at_node['water_table__elevation'])427		428		env_state.SZgrid.at_node['discharge'][:] *= (1/dt)429		430		env_state.grid.at_node['riv_sat_deficit'][:] = (np.power(env_state.grid.dx,2)431				* np.array(env_state.grid.at_node['river_topo_elevation'][:]432				- env_state.SZgrid.at_node['water_table__elevation'][:])433				)434		435		env_state.grid.at_node['riv_sat_deficit'][env_state.grid.at_node['riv_sat_deficit'][:] < 0] = 0.0436		437		if self.act_fix_link == 1:438				self.flux_out *= 1/dt439		440		pass441	442	def SZ_potential_ET(self, env_state, pet_sz):443		SZ_aet = (env_state.SZgrid.at_node['water_table__elevation']444			- env_state.grid.at_node['topographic__elevation']445			+ env_state.Droot*0.001)*1000		446		SZ_aet[SZ_aet < 0] = 0		447		f = SZ_aet/env_state.Droot448		return f*pet_sz449def transmissivity(env_state, Ksat, act_links, f, zm):450	T = np.zeros(len(Ksat))451	452	if env_state.func != 2:453		hm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'water_table__elevation')454	455	if env_state.func == 1: # Variable transmissivity			456		T = exponential_T(Ksat, f, zm, hm)457	elif env_state.func == 2: # Constant transmissivity				458		T[act_links] = Ksat[act_links]459	else: # Unconfined aquifer460		bm = map_mean_of_link_nodes_to_link(env_state.SZgrid,'BOT')461		T[act_links] = Ksat[act_links]*(hm[act_links]-bm[act_links])462	463	return T464def fun_update_UZ_SZ_depth(env_state, tht_dt, Droot):465	"""Function to update water table depending on both water content of466	the unsaturated zone.467	Parameters:468	Droot:		Rooting depth [mm]469	tht_dt:		Water content at time t [-]470	Duz:		Unsaturated zone depth471	env_state:	grid:	z:	Topograhic elevation472						h:	water table473						fs:	Saturated water content474						fc:	Field capacity475						Sy:	Specific yield476						dq:	water storage anomaly477	Groundwater storage variation478	"""	479	h0 = env_state.SZgrid.at_node['water_table__elevation'] - \480		env_state.SZgrid.at_node['water_storage_anomaly']/ \481		env_state.SZgrid.at_node['SZ_Sy']482	483	tht_dt = np.where(env_state.SZgrid.at_node['water_storage_anomaly'][:] < 0.0,484			env_state.fc, tht_dt)485	486	dtht = env_state.grid.at_node['saturated_water_content'] - tht_dt487		488	ruz = (env_state.grid.at_node['topographic__elevation']-Droot) - h0489				490	dh = np.where((h0-(env_state.grid.at_node['topographic__elevation']-Droot)) > 0.0,491		env_state.SZgrid.at_node['water_storage_anomaly']/dtht,492		env_state.SZgrid.at_node['water_storage_anomaly']/493		env_state.SZgrid.at_node['SZ_Sy'])				494			495	h_aux = h0+dh-(env_state.grid.at_node['topographic__elevation']-Droot)496	497	dhr_aux = np.abs(ruz)-np.abs(dh)498	499	dh_aux = np.where(dh >= 0,1,-1)500	501	alpha = -1.*np.less_equal(dhr_aux, 0)*dh_aux*np.less_equal(ruz, 0)502			503	beta = np.less_equal(dhr_aux, 0)*dh_aux*np.less_equal(ruz, 0)504	505	gama = np.where(dhr_aux > 0, 1, 0)506	507	gama = np.where(dh > 0, gama, 1-gama)*np.less(h_aux, 0)508	509	gama[h_aux < 0] = 1510			511	dht = ((env_state.SZgrid.at_node['water_storage_anomaly']512		+ ruz*(alpha*env_state.SZgrid.at_node['SZ_Sy'] + beta*dtht))513		/ ((gama*env_state.SZgrid.at_node['SZ_Sy'] + (1-gama)*dtht)))514	515	env_state.SZgrid.at_node['water_table__elevation'] = h0 + dht516	517	env_state.SZgrid.at_node['discharge'][:] += np.where((518		env_state.SZgrid.at_node['topographic__elevation']519		- env_state.SZgrid.at_node['water_table__elevation']) <= 0.0,520		- (env_state.SZgrid.at_node['topographic__elevation']521		- env_state.SZgrid.at_node['water_table__elevation'])*dtht, 0.0)522			523	env_state.SZgrid.at_node['water_table__elevation'][:] = np.minimum(524		env_state.SZgrid.at_node['water_table__elevation'],525		env_state.SZgrid.at_node['topographic__elevation'])526	527	pass528def storage(env_state):529	storage = np.sum((env_state.SZgrid.at_node['water_table__elevation'][env_state.SZgrid.core_nodes] -\530		env_state.SZgrid.at_node['BOT'][env_state.SZgrid.core_nodes])*\531		env_state.SZgrid.at_node['SZ_Sy'][env_state.SZgrid.core_nodes])532	return storage533	534def storage_2layer(env_state):535	head_Sy = (np.maximum(env_state.SZgrid.at_node['water_table__elevation'],536		env_state.SZgrid.at_node['BOT_2'])537		- env_state.SZgrid.at_node['BOT_2'])538	storage_1 = np.sum(head_Ss[env_state.SZgrid.core_nodes]539		*env_state.SZgrid.at_node['SZ_Sy'][env_state.SZgrid.core_nodes])540	541	head_Sy = (np.minimun(env_state.SZgrid.at_node['HEAD_2'],542		env_state.SZgrid.at_node['BOT_2'])543		- env_state.SZgrid.at_node['BOT'])544	545	head_Ss = (np.maximum(env_state.SZgrid.at_node['HEAD_2'],546		env_state.SZgrid.at_node['BOT_2'])547		- env_state.SZgrid.at_node['BOT_2'])548	549	storage_2 = np.sum((head_Sy*env_state.SZgrid.at_node['Sy_2']550		+head_Ss*env_state.SZgrid.at_node['Ss_2'])[env_state.SZgrid.core_nodes]551		)552	return storage_1+storage_2553def storage_uz_sz(env_state, tht, dh):554	""" Total storage in the saturated zone555	Parameters:556		env_state:	state variables and model parameters			557	Output:558		total:		Volume of water stored in the saturated zone [m]559	"""560	str_uz1 = (env_state.SZgrid.at_node['water_table__elevation']561		- (env_state.grid.at_node['topographic__elevation']562		- env_state.Droot*0.001)563		)564	565	str_uz1[str_uz1 < 0] = 0.0566		567	str_uz0 = (env_state.SZgrid.at_node['water_table__elevation'] - dh568		- (env_state.grid.at_node['topographic__elevation']569		- env_state.Droot*0.001)570		)571	572	str_uz0[str_uz0 < 0] = 0.0573	574	tht[dh < 0] = env_state.fc[dh < 0]575	576	dtht = env_state.grid.at_node['saturated_water_content'] - tht577	578	str_uz = dtht*(str_uz1-str_uz0)579	580	str_sz = (dh-str_uz1+str_uz0)*env_state.SZgrid.at_node['SZ_Sy']581	582	total = np.sum(str_sz[env_state.SZgrid.core_nodes]583		+ str_uz[env_state.SZgrid.core_nodes]584		)585	return total586def smoth_func_L1(h, hr, r, dq, *nodes):587	aux = np.power(h-hr, 3)/r	588	aux = np.where(aux > 0, aux, 0)589	aux = np.where(aux > 1, 1, aux)590	aux = np.where(dq > 0, 1, aux)591	592	if nodes:593		p = np.zeros(len(aux))594		p[nodes] = 1595		aux *= p596	597	return  aux598	599def smoth_func_L2(h, hr, d, r,*nodes):600	u = (h-hr)/D601	aux = (1-u)/r602	FSy = 1 - np.exp(-aux)603	FSs = 1 - np.exp(aux)604	FSy = np.where(u >= 1, 0, fSy)605	FSs = np.where(u >= 1, fSs, 0)606		607	if nodes:608		aux = np.zeros(len(aux))609		aux[nodes] = 1610		FSs *= aux611		FSy *= aux612	613	return  FSy, FSs614def river_flux(h, hriv, C, A, *nodes):615	q_riv = (h-hriv)*C/A616	if nodes:617		p = np.zeros(len(aux))618		p[nodes] = 1619		aux *= p620	return  q_riv621def smoth_func_T(h, hriv, r, f, dq, *nodes):622	SF = (h-hriv+f)/f623	SF = np.where(SF > 1, SF, 0)624	SF = np.where(SF > 0, 1 - np.exp(SF/r), 0)625	if nodes:626		p = np.zeros(len(SF))627		p[nodes] = 1628		SF *= p629	return SF630def smoth_func(h, hriv, r, dq, *nodes):631	aux = np.power(h-hriv, 3)/r	632	aux = np.where(aux > 0, aux, 0)633	aux = np.where(aux > 1, 1, aux)634	if nodes:635		p = np.zeros(len(aux))636		p[nodes] = 1637		aux *= p638	return  np.where(aux <= 0, 0, aux)639	640# regularization function for unconfined641def regularization(zm, hm, bm, dq, r):642	# zm:	surface elevation643	# hm:	hydraulic head644	# bm:	bottom elevation aquifer645	# dq:	flux per unit area646	# r:	regularization factor647	aux = (hm-bm)/(zm-bm)648	aux = np.where((aux-1) > 0, 1, aux)649	return np.exp((aux-1)/r)*dq*np.where(dq > 0, 1, 0)650# regularization function for confined aquifers651def regularization_T(zm, hm, f, dq, r):652	# zm:	surface elevation653	# hm:	hydraulic head654	# f:	e-folding depth655	# dq:	flux per unit area656	# r:	regularization factor657	aux = (hm-zm)/f+1	658	aux = np.where(aux > 0,aux,0)659	return np.exp((aux-1)/r)*dq*np.where(dq > 0,1,0)660	661def exponential_T(Ksat, f, z, h):662	return Ksat*f*np.exp(-np.maximum(z-h,0)/f)663# Maximum time step for unconfined aquifers664def time_step(D, Sy, Ksat, h, zb, dx, *nodes):665	# D:	Courant number666	T = (h - zb)*Ksat667	dt = D*Sy*np.power(dx,2)/(4*T)668	if nodes:		669		dt = np.nanmin((dt[nodes])[dt[nodes] > 0])670	else:671		dt = np.nanmin(dt[dt > 0])672	return dt673# Maximum time step for confined aquifers674def time_step_confined(D, Sy, T, dx, *nodes):675	# D:	Courant number676	dt = D*Sy*np.power(dx, 2)/(4*T)	677	if nodes:		678		dt = np.nanmin((dt[nodes])[dt[nodes] > 0])679	else:680		dt = np.nanmin(dt[dt > 0])...test_lexnparse.py
Source:test_lexnparse.py  
1# -*- coding: utf-8 -*-2"""3    jinja2.testsuite.lexnparse4    ~~~~~~~~~~~~~~~~~~~~~~~~~~5    All the unittests regarding lexing, parsing and syntax.6    :copyright: (c) 2017 by the Jinja Team.7    :license: BSD, see LICENSE for more details.8"""9import pytest10from jinja2 import Environment, Template, TemplateSyntaxError, \11     UndefinedError, nodes12from jinja2._compat import iteritems, text_type, PY213from jinja2.lexer import Token, TokenStream, TOKEN_EOF, \14     TOKEN_BLOCK_BEGIN, TOKEN_BLOCK_END15# how does a string look like in jinja syntax?16if PY2:17    def jinja_string_repr(string):18        return repr(string)[1:]19else:20    jinja_string_repr = repr21@pytest.mark.lexnparse22@pytest.mark.tokenstream23class TestTokenStream(object):24    test_tokens = [Token(1, TOKEN_BLOCK_BEGIN, ''),25                   Token(2, TOKEN_BLOCK_END, ''),26                   ]27    def test_simple(self, env):28        ts = TokenStream(self.test_tokens, "foo", "bar")29        assert ts.current.type is TOKEN_BLOCK_BEGIN30        assert bool(ts)31        assert not bool(ts.eos)32        next(ts)33        assert ts.current.type is TOKEN_BLOCK_END34        assert bool(ts)35        assert not bool(ts.eos)36        next(ts)37        assert ts.current.type is TOKEN_EOF38        assert not bool(ts)39        assert bool(ts.eos)40    def test_iter(self, env):41        token_types = [42            t.type for t in TokenStream(self.test_tokens, "foo", "bar")43        ]44        assert token_types == ['block_begin', 'block_end', ]45@pytest.mark.lexnparse46@pytest.mark.lexer47class TestLexer(object):48    def test_raw1(self, env):49        tmpl = env.from_string(50            '{% raw %}foo{% endraw %}|'51            '{%raw%}{{ bar }}|{% baz %}{%       endraw    %}')52        assert tmpl.render() == 'foo|{{ bar }}|{% baz %}'53    def test_raw2(self, env):54        tmpl = env.from_string('1  {%- raw -%}   2   {%- endraw -%}   3')55        assert tmpl.render() == '123'56    def test_balancing(self, env):57        env = Environment('{%', '%}', '${', '}')58        tmpl = env.from_string('''{% for item in seq59            %}${{'foo': item}|upper}{% endfor %}''')60        assert tmpl.render(seq=list(range(3))) \61            == "{'FOO': 0}{'FOO': 1}{'FOO': 2}"62    def test_comments(self, env):63        env = Environment('<!--', '-->', '{', '}')64        tmpl = env.from_string('''\65<ul>66<!--- for item in seq -->67  <li>{item}</li>68<!--- endfor -->69</ul>''')70        assert tmpl.render(seq=list(range(3))) \71            == ("<ul>\n  <li>0</li>\n  ""<li>1</li>\n  <li>2</li>\n</ul>")72    def test_string_escapes(self, env):73        for char in u'\0', u'\u2668', u'\xe4', u'\t', u'\r', u'\n':74            tmpl = env.from_string('{{ %s }}' % jinja_string_repr(char))75            assert tmpl.render() == char76        assert env.from_string('{{ "\N{HOT SPRINGS}" }}').render() == u'\u2668'77    def test_bytefallback(self, env):78        from pprint import pformat79        tmpl = env.from_string(u'''{{ 'foo'|pprint }}|{{ 'bär'|pprint }}''')80        assert tmpl.render() == pformat('foo') + '|' + pformat(u'bär')81    def test_operators(self, env):82        from jinja2.lexer import operators83        for test, expect in iteritems(operators):84            if test in '([{}])':85                continue86            stream = env.lexer.tokenize('{{ %s }}' % test)87            next(stream)88            assert stream.current.type == expect89    def test_normalizing(self, env):90        for seq in '\r', '\r\n', '\n':91            env = Environment(newline_sequence=seq)92            tmpl = env.from_string('1\n2\r\n3\n4\n')93            result = tmpl.render()94            assert result.replace(seq, 'X') == '1X2X3X4'95    def test_trailing_newline(self, env):96        for keep in [True, False]:97            env = Environment(keep_trailing_newline=keep)98            for template, expected in [99                    ('', {}),100                    ('no\nnewline', {}),101                    ('with\nnewline\n', {False: 'with\nnewline'}),102                    ('with\nseveral\n\n\n', {False: 'with\nseveral\n\n'}),103                    ]:104                tmpl = env.from_string(template)105                expect = expected.get(keep, template)106                result = tmpl.render()107                assert result == expect, (keep, template, result, expect)108@pytest.mark.lexnparse109@pytest.mark.parser110class TestParser(object):111    def test_php_syntax(self, env):112        env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->')113        tmpl = env.from_string('''\114<!-- I'm a comment, I'm not interesting -->\115<? for item in seq -?>116    <?= item ?>117<?- endfor ?>''')118        assert tmpl.render(seq=list(range(5))) == '01234'119    def test_erb_syntax(self, env):120        env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>')121        tmpl = env.from_string('''\122<%# I'm a comment, I'm not interesting %>\123<% for item in seq -%>124    <%= item %>125<%- endfor %>''')126        assert tmpl.render(seq=list(range(5))) == '01234'127    def test_comment_syntax(self, env):128        env = Environment('<!--', '-->', '${', '}', '<!--#', '-->')129        tmpl = env.from_string('''\130<!--# I'm a comment, I'm not interesting -->\131<!-- for item in seq --->132    ${item}133<!--- endfor -->''')134        assert tmpl.render(seq=list(range(5))) == '01234'135    def test_balancing(self, env):136        tmpl = env.from_string('''{{{'foo':'bar'}.foo}}''')137        assert tmpl.render() == 'bar'138    def test_start_comment(self, env):139        tmpl = env.from_string('''{# foo comment140and bar comment #}141{% macro blub() %}foo{% endmacro %}142{{ blub() }}''')143        assert tmpl.render().strip() == 'foo'144    def test_line_syntax(self, env):145        env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%')146        tmpl = env.from_string('''\147<%# regular comment %>148% for item in seq:149    ${item}150% endfor''')151        assert [152            int(x.strip()) for x in tmpl.render(seq=list(range(5))).split()153        ] == list(range(5))154        env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##')155        tmpl = env.from_string('''\156<%# regular comment %>157% for item in seq:158    ${item} ## the rest of the stuff159% endfor''')160        assert [161            int(x.strip()) for x in tmpl.render(seq=list(range(5))).split()162        ] == list(range(5))163    def test_line_syntax_priority(self, env):164        # XXX: why is the whitespace there in front of the newline?165        env = Environment('{%', '%}', '${', '}', '/*', '*/', '##', '#')166        tmpl = env.from_string('''\167/* ignore me.168   I'm a multiline comment */169## for item in seq:170* ${item}          # this is just extra stuff171## endfor''')172        assert tmpl.render(seq=[1, 2]).strip() == '* 1\n* 2'173        env = Environment('{%', '%}', '${', '}', '/*', '*/', '#', '##')174        tmpl = env.from_string('''\175/* ignore me.176   I'm a multiline comment */177# for item in seq:178* ${item}          ## this is just extra stuff179    ## extra stuff i just want to ignore180# endfor''')181        assert tmpl.render(seq=[1, 2]).strip() == '* 1\n\n* 2'182    def test_error_messages(self, env):183        def assert_error(code, expected):184            try:185                Template(code)186            except TemplateSyntaxError as e:187                assert str(e) == expected, 'unexpected error message'188            else:189                assert False, 'that was supposed to be an error'190        assert_error('{% for item in seq %}...{% endif %}',191                     "Encountered unknown tag 'endif'. Jinja was looking "192                     "for the following tags: 'endfor' or 'else'. The "193                     "innermost block that needs to be closed is 'for'.")194        assert_error(195            '{% if foo %}{% for item in seq %}...{% endfor %}{% endfor %}',196            "Encountered unknown tag 'endfor'. Jinja was looking for "197            "the following tags: 'elif' or 'else' or 'endif'. The "198            "innermost block that needs to be closed is 'if'.")199        assert_error('{% if foo %}',200                     "Unexpected end of template. Jinja was looking for the "201                     "following tags: 'elif' or 'else' or 'endif'. The "202                     "innermost block that needs to be closed is 'if'.")203        assert_error('{% for item in seq %}',204                     "Unexpected end of template. Jinja was looking for the "205                     "following tags: 'endfor' or 'else'. The innermost block "206                     "that needs to be closed is 'for'.")207        assert_error(208            '{% block foo-bar-baz %}',209            "Block names in Jinja have to be valid Python identifiers "210            "and may not contain hyphens, use an underscore instead.")211        assert_error('{% unknown_tag %}',212                     "Encountered unknown tag 'unknown_tag'.")213@pytest.mark.lexnparse214@pytest.mark.syntax215class TestSyntax(object):216    def test_call(self, env):217        env = Environment()218        env.globals['foo'] = lambda a, b, c, e, g: a + b + c + e + g219        tmpl = env.from_string(220            "{{ foo('a', c='d', e='f', *['b'], **{'g': 'h'}) }}"221        )222        assert tmpl.render() == 'abdfh'223    def test_slicing(self, env):224        tmpl = env.from_string('{{ [1, 2, 3][:] }}|{{ [1, 2, 3][::-1] }}')225        assert tmpl.render() == '[1, 2, 3]|[3, 2, 1]'226    def test_attr(self, env):227        tmpl = env.from_string("{{ foo.bar }}|{{ foo['bar'] }}")228        assert tmpl.render(foo={'bar': 42}) == '42|42'229    def test_subscript(self, env):230        tmpl = env.from_string("{{ foo[0] }}|{{ foo[-1] }}")231        assert tmpl.render(foo=[0, 1, 2]) == '0|2'232    def test_tuple(self, env):233        tmpl = env.from_string('{{ () }}|{{ (1,) }}|{{ (1, 2) }}')234        assert tmpl.render() == '()|(1,)|(1, 2)'235    def test_math(self, env):236        tmpl = env.from_string('{{ (1 + 1 * 2) - 3 / 2 }}|{{ 2**3 }}')237        assert tmpl.render() == '1.5|8'238    def test_div(self, env):239        tmpl = env.from_string('{{ 3 // 2 }}|{{ 3 / 2 }}|{{ 3 % 2 }}')240        assert tmpl.render() == '1|1.5|1'241    def test_unary(self, env):242        tmpl = env.from_string('{{ +3 }}|{{ -3 }}')243        assert tmpl.render() == '3|-3'244    def test_concat(self, env):245        tmpl = env.from_string("{{ [1, 2] ~ 'foo' }}")246        assert tmpl.render() == '[1, 2]foo'247    def test_compare(self, env):248        tmpl = env.from_string('{{ 1 > 0 }}|{{ 1 >= 1 }}|{{ 2 < 3 }}|'249                               '{{ 2 == 2 }}|{{ 1 <= 1 }}')250        assert tmpl.render() == 'True|True|True|True|True'251    def test_inop(self, env):252        tmpl = env.from_string('{{ 1 in [1, 2, 3] }}|{{ 1 not in [1, 2, 3] }}')253        assert tmpl.render() == 'True|False'254    def test_literals(self, env):255        tmpl = env.from_string('{{ [] }}|{{ {} }}|{{ () }}')256        assert tmpl.render().lower() == '[]|{}|()'257    def test_bool(self, env):258        tmpl = env.from_string('{{ true and false }}|{{ false '259                               'or true }}|{{ not false }}')260        assert tmpl.render() == 'False|True|True'261    def test_grouping(self, env):262        tmpl = env.from_string(263            '{{ (true and false) or (false and true) and not false }}')264        assert tmpl.render() == 'False'265    def test_django_attr(self, env):266        tmpl = env.from_string('{{ [1, 2, 3].0 }}|{{ [[1]].0.0 }}')267        assert tmpl.render() == '1|1'268    def test_conditional_expression(self, env):269        tmpl = env.from_string('''{{ 0 if true else 1 }}''')270        assert tmpl.render() == '0'271    def test_short_conditional_expression(self, env):272        tmpl = env.from_string('<{{ 1 if false }}>')273        assert tmpl.render() == '<>'274        tmpl = env.from_string('<{{ (1 if false).bar }}>')275        pytest.raises(UndefinedError, tmpl.render)276    def test_filter_priority(self, env):277        tmpl = env.from_string('{{ "foo"|upper + "bar"|upper }}')278        assert tmpl.render() == 'FOOBAR'279    def test_function_calls(self, env):280        tests = [281            (True, '*foo, bar'),282            (True, '*foo, *bar'),283            (True, '*foo, bar=42'),284            (True, '**foo, *bar'),285            (True, '**foo, bar'),286            (False, 'foo, bar'),287            (False, 'foo, bar=42'),288            (False, 'foo, bar=23, *args'),289            (False, 'a, b=c, *d, **e'),290            (False, '*foo, **bar')291        ]292        for should_fail, sig in tests:293            if should_fail:294                pytest.raises(TemplateSyntaxError,295                              env.from_string, '{{ foo(%s) }}' % sig)296            else:297                env.from_string('foo(%s)' % sig)298    def test_tuple_expr(self, env):299        for tmpl in [300            '{{ () }}',301            '{{ (1, 2) }}',302            '{{ (1, 2,) }}',303            '{{ 1, }}',304            '{{ 1, 2 }}',305            '{% for foo, bar in seq %}...{% endfor %}',306            '{% for x in foo, bar %}...{% endfor %}',307            '{% for x in foo, %}...{% endfor %}'308        ]:309            assert env.from_string(tmpl)310    def test_trailing_comma(self, env):311        tmpl = env.from_string('{{ (1, 2,) }}|{{ [1, 2,] }}|{{ {1: 2,} }}')312        assert tmpl.render().lower() == '(1, 2)|[1, 2]|{1: 2}'313    def test_block_end_name(self, env):314        env.from_string('{% block foo %}...{% endblock foo %}')315        pytest.raises(TemplateSyntaxError, env.from_string,316                      '{% block x %}{% endblock y %}')317    def test_constant_casing(self, env):318        for const in True, False, None:319            tmpl = env.from_string('{{ %s }}|{{ %s }}|{{ %s }}' % (320                str(const), str(const).lower(), str(const).upper()321            ))322            assert tmpl.render() == '%s|%s|' % (const, const)323    def test_test_chaining(self, env):324        pytest.raises(TemplateSyntaxError, env.from_string,325                      '{{ foo is string is sequence }}')326        assert env.from_string(327            '{{ 42 is string or 42 is number }}'328        ).render() == 'True'329    def test_string_concatenation(self, env):330        tmpl = env.from_string('{{ "foo" "bar" "baz" }}')331        assert tmpl.render() == 'foobarbaz'332    def test_notin(self, env):333        bar = range(100)334        tmpl = env.from_string('''{{ not 42 in bar }}''')335        assert tmpl.render(bar=bar) == text_type(not 42 in bar)336    def test_operator_precedence(self, env):337        tmpl = env.from_string('''{{ 2 * 3 + 4 % 2 + 1 - 2 }}''')338        assert tmpl.render() == text_type(2 * 3 + 4 % 2 + 1 - 2)339    def test_implicit_subscribed_tuple(self, env):340        class Foo(object):341            def __getitem__(self, x):342                return x343        t = env.from_string('{{ foo[1, 2] }}')344        assert t.render(foo=Foo()) == u'(1, 2)'345    def test_raw2(self, env):346        tmpl = env.from_string('{% raw %}{{ FOO }} and {% BAR %}{% endraw %}')347        assert tmpl.render() == '{{ FOO }} and {% BAR %}'348    def test_const(self, env):349        tmpl = env.from_string(350            '{{ true }}|{{ false }}|{{ none }}|'351            '{{ none is defined }}|{{ missing is defined }}')352        assert tmpl.render() == 'True|False|None|True|False'353    def test_neg_filter_priority(self, env):354        node = env.parse('{{ -1|foo }}')355        assert isinstance(node.body[0].nodes[0], nodes.Filter)356        assert isinstance(node.body[0].nodes[0].node, nodes.Neg)357    def test_const_assign(self, env):358        constass1 = '''{% set true = 42 %}'''359        constass2 = '''{% for none in seq %}{% endfor %}'''360        for tmpl in constass1, constass2:361            pytest.raises(TemplateSyntaxError, env.from_string, tmpl)362    def test_localset(self, env):363        tmpl = env.from_string('''{% set foo = 0 %}\364{% for item in [1, 2] %}{% set foo = 1 %}{% endfor %}\365{{ foo }}''')366        assert tmpl.render() == '0'367    def test_parse_unary(self, env):368        tmpl = env.from_string('{{ -foo["bar"] }}')369        assert tmpl.render(foo={'bar': 42}) == '-42'370        tmpl = env.from_string('{{ -foo["bar"]|abs }}')371        assert tmpl.render(foo={'bar': 42}) == '42'372@pytest.mark.lexnparse373@pytest.mark.lstripblocks374class TestLstripBlocks(object):375    def test_lstrip(self, env):376        env = Environment(lstrip_blocks=True, trim_blocks=False)377        tmpl = env.from_string('''    {% if True %}\n    {% endif %}''')378        assert tmpl.render() == "\n"379    def test_lstrip_trim(self, env):380        env = Environment(lstrip_blocks=True, trim_blocks=True)381        tmpl = env.from_string('''    {% if True %}\n    {% endif %}''')382        assert tmpl.render() == ""383    def test_no_lstrip(self, env):384        env = Environment(lstrip_blocks=True, trim_blocks=False)385        tmpl = env.from_string('''    {%+ if True %}\n    {%+ endif %}''')386        assert tmpl.render() == "    \n    "387    def test_lstrip_endline(self, env):388        env = Environment(lstrip_blocks=True, trim_blocks=False)389        tmpl = env.from_string(390            '''    hello{% if True %}\n    goodbye{% endif %}''')391        assert tmpl.render() == "    hello\n    goodbye"392    def test_lstrip_inline(self, env):393        env = Environment(lstrip_blocks=True, trim_blocks=False)394        tmpl = env.from_string('''    {% if True %}hello    {% endif %}''')395        assert tmpl.render() == 'hello    '396    def test_lstrip_nested(self, env):397        env = Environment(lstrip_blocks=True, trim_blocks=False)398        tmpl = env.from_string(399            '''    {% if True %}a {% if True %}b {% endif %}c {% endif %}''')400        assert tmpl.render() == 'a b c '401    def test_lstrip_left_chars(self, env):402        env = Environment(lstrip_blocks=True, trim_blocks=False)403        tmpl = env.from_string('''    abc {% if True %}404        hello{% endif %}''')405        assert tmpl.render() == '    abc \n        hello'406    def test_lstrip_embeded_strings(self, env):407        env = Environment(lstrip_blocks=True, trim_blocks=False)408        tmpl = env.from_string('''    {% set x = " {% str %} " %}{{ x }}''')409        assert tmpl.render() == ' {% str %} '410    def test_lstrip_preserve_leading_newlines(self, env):411        env = Environment(lstrip_blocks=True, trim_blocks=False)412        tmpl = env.from_string('''\n\n\n{% set hello = 1 %}''')413        assert tmpl.render() == '\n\n\n'414    def test_lstrip_comment(self, env):415        env = Environment(lstrip_blocks=True, trim_blocks=False)416        tmpl = env.from_string('''    {# if True #}417hello418    {#endif#}''')419        assert tmpl.render() == '\nhello\n'420    def test_lstrip_angle_bracket_simple(self, env):421        env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',422                          lstrip_blocks=True, trim_blocks=True)423        tmpl = env.from_string('''    <% if True %>hello    <% endif %>''')424        assert tmpl.render() == 'hello    '425    def test_lstrip_angle_bracket_comment(self, env):426        env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',427                          lstrip_blocks=True, trim_blocks=True)428        tmpl = env.from_string('''    <%# if True %>hello    <%# endif %>''')429        assert tmpl.render() == 'hello    '430    def test_lstrip_angle_bracket(self, env):431        env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',432                          lstrip_blocks=True, trim_blocks=True)433        tmpl = env.from_string('''\434    <%# regular comment %>435    <% for item in seq %>436${item} ## the rest of the stuff437   <% endfor %>''')438        assert tmpl.render(seq=range(5)) == \439            ''.join('%s\n' % x for x in range(5))440    def test_lstrip_angle_bracket_compact(self, env):441        env = Environment('<%', '%>', '${', '}', '<%#', '%>', '%', '##',442                          lstrip_blocks=True, trim_blocks=True)443        tmpl = env.from_string('''\444    <%#regular comment%>445    <%for item in seq%>446${item} ## the rest of the stuff447   <%endfor%>''')448        assert tmpl.render(seq=range(5)) == \449            ''.join('%s\n' % x for x in range(5))450    def test_php_syntax_with_manual(self, env):451        env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->',452                          lstrip_blocks=True, trim_blocks=True)453        tmpl = env.from_string('''\454    <!-- I'm a comment, I'm not interesting -->455    <? for item in seq -?>456        <?= item ?>457    <?- endfor ?>''')458        assert tmpl.render(seq=range(5)) == '01234'459    def test_php_syntax(self, env):460        env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->',461                          lstrip_blocks=True, trim_blocks=True)462        tmpl = env.from_string('''\463    <!-- I'm a comment, I'm not interesting -->464    <? for item in seq ?>465        <?= item ?>466    <? endfor ?>''')467        assert tmpl.render(seq=range(5)) \468            == ''.join('        %s\n' % x for x in range(5))469    def test_php_syntax_compact(self, env):470        env = Environment('<?', '?>', '<?=', '?>', '<!--', '-->',471                          lstrip_blocks=True, trim_blocks=True)472        tmpl = env.from_string('''\473    <!-- I'm a comment, I'm not interesting -->474    <?for item in seq?>475        <?=item?>476    <?endfor?>''')477        assert tmpl.render(seq=range(5)) \478            == ''.join('        %s\n' % x for x in range(5))479    def test_erb_syntax(self, env):480        env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>',481                          lstrip_blocks=True, trim_blocks=True)482        # env.from_string('')483        # for n,r in env.lexer.rules.iteritems():484        #    print n485        # print env.lexer.rules['root'][0][0].pattern486        # print "'%s'" % tmpl.render(seq=range(5))487        tmpl = env.from_string('''\488<%# I'm a comment, I'm not interesting %>489    <% for item in seq %>490    <%= item %>491    <% endfor %>492''')493        assert tmpl.render(seq=range(5)) \494            == ''.join('    %s\n' % x for x in range(5))495    def test_erb_syntax_with_manual(self, env):496        env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>',497                          lstrip_blocks=True, trim_blocks=True)498        tmpl = env.from_string('''\499<%# I'm a comment, I'm not interesting %>500    <% for item in seq -%>501        <%= item %>502    <%- endfor %>''')503        assert tmpl.render(seq=range(5)) == '01234'504    def test_erb_syntax_no_lstrip(self, env):505        env = Environment('<%', '%>', '<%=', '%>', '<%#', '%>',506                          lstrip_blocks=True, trim_blocks=True)507        tmpl = env.from_string('''\508<%# I'm a comment, I'm not interesting %>509    <%+ for item in seq -%>510        <%= item %>511    <%- endfor %>''')512        assert tmpl.render(seq=range(5)) == '    01234'513    def test_comment_syntax(self, env):514        env = Environment('<!--', '-->', '${', '}', '<!--#', '-->',515                          lstrip_blocks=True, trim_blocks=True)516        tmpl = env.from_string('''\517<!--# I'm a comment, I'm not interesting -->\518<!-- for item in seq --->519    ${item}520<!--- endfor -->''')...test_filters.py
Source:test_filters.py  
1# -*- coding: utf-8 -*-2"""3    jinja2.testsuite.filters4    ~~~~~~~~~~~~~~~~~~~~~~~~5    Tests for the jinja filters.6    :copyright: (c) 2017 by the Jinja Team.7    :license: BSD, see LICENSE for more details.8"""9import pytest10from jinja2 import Markup, Environment11from jinja2._compat import text_type, implements_to_string12@pytest.mark.filter13class TestFilter(object):14    def test_filter_calling(self, env):15        rv = env.call_filter('sum', [1, 2, 3])16        assert rv == 617    def test_capitalize(self, env):18        tmpl = env.from_string('{{ "foo bar"|capitalize }}')19        assert tmpl.render() == 'Foo bar'20    def test_center(self, env):21        tmpl = env.from_string('{{ "foo"|center(9) }}')22        assert tmpl.render() == '   foo   '23    def test_default(self, env):24        tmpl = env.from_string(25            "{{ missing|default('no') }}|{{ false|default('no') }}|"26            "{{ false|default('no', true) }}|{{ given|default('no') }}"27        )28        assert tmpl.render(given='yes') == 'no|False|no|yes'29    def test_dictsort(self, env):30        tmpl = env.from_string(31            '{{ foo|dictsort }}|'32            '{{ foo|dictsort(true) }}|'33            '{{ foo|dictsort(false, "value") }}'34        )35        out = tmpl.render(foo={"aa": 0, "b": 1, "c": 2, "AB": 3})36        assert out == ("[('aa', 0), ('AB', 3), ('b', 1), ('c', 2)]|"37                       "[('AB', 3), ('aa', 0), ('b', 1), ('c', 2)]|"38                       "[('aa', 0), ('b', 1), ('c', 2), ('AB', 3)]")39    def test_batch(self, env):40        tmpl = env.from_string("{{ foo|batch(3)|list }}|"41                               "{{ foo|batch(3, 'X')|list }}")42        out = tmpl.render(foo=list(range(10)))43        assert out == ("[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]|"44                       "[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 'X', 'X']]")45    def test_slice(self, env):46        tmpl = env.from_string('{{ foo|slice(3)|list }}|'47                               '{{ foo|slice(3, "X")|list }}')48        out = tmpl.render(foo=list(range(10)))49        assert out == ("[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]]|"50                       "[[0, 1, 2, 3], [4, 5, 6, 'X'], [7, 8, 9, 'X']]")51    def test_escape(self, env):52        tmpl = env.from_string('''{{ '<">&'|escape }}''')53        out = tmpl.render()54        assert out == '<">&'55    def test_striptags(self, env):56        tmpl = env.from_string('''{{ foo|striptags }}''')57        out = tmpl.render(foo='  <p>just a small   \n <a href="#">'58                          'example</a> link</p>\n<p>to a webpage</p> '59                          '<!-- <p>and some commented stuff</p> -->')60        assert out == 'just a small example link to a webpage'61    def test_filesizeformat(self, env):62        tmpl = env.from_string(63            '{{ 100|filesizeformat }}|'64            '{{ 1000|filesizeformat }}|'65            '{{ 1000000|filesizeformat }}|'66            '{{ 1000000000|filesizeformat }}|'67            '{{ 1000000000000|filesizeformat }}|'68            '{{ 100|filesizeformat(true) }}|'69            '{{ 1000|filesizeformat(true) }}|'70            '{{ 1000000|filesizeformat(true) }}|'71            '{{ 1000000000|filesizeformat(true) }}|'72            '{{ 1000000000000|filesizeformat(true) }}'73        )74        out = tmpl.render()75        assert out == (76            '100 Bytes|1.0 kB|1.0 MB|1.0 GB|1.0 TB|100 Bytes|'77            '1000 Bytes|976.6 KiB|953.7 MiB|931.3 GiB'78        )79    def test_filesizeformat_issue59(self, env):80        tmpl = env.from_string(81            '{{ 300|filesizeformat }}|'82            '{{ 3000|filesizeformat }}|'83            '{{ 3000000|filesizeformat }}|'84            '{{ 3000000000|filesizeformat }}|'85            '{{ 3000000000000|filesizeformat }}|'86            '{{ 300|filesizeformat(true) }}|'87            '{{ 3000|filesizeformat(true) }}|'88            '{{ 3000000|filesizeformat(true) }}'89        )90        out = tmpl.render()91        assert out == (92            '300 Bytes|3.0 kB|3.0 MB|3.0 GB|3.0 TB|300 Bytes|'93            '2.9 KiB|2.9 MiB'94        )95    def test_first(self, env):96        tmpl = env.from_string('{{ foo|first }}')97        out = tmpl.render(foo=list(range(10)))98        assert out == '0'99    def test_float(self, env):100        tmpl = env.from_string('{{ "42"|float }}|'101                               '{{ "ajsghasjgd"|float }}|'102                               '{{ "32.32"|float }}')103        out = tmpl.render()104        assert out == '42.0|0.0|32.32'105    def test_format(self, env):106        tmpl = env.from_string('''{{ "%s|%s"|format("a", "b") }}''')107        out = tmpl.render()108        assert out == 'a|b'109    def test_indent(self, env):110        tmpl = env.from_string('{{ foo|indent(2) }}|{{ foo|indent(2, true) }}')111        text = '\n'.join([' '.join(['foo', 'bar'] * 2)] * 2)112        out = tmpl.render(foo=text)113        assert out == ('foo bar foo bar\n  foo bar foo bar|  '114                       'foo bar foo bar\n  foo bar foo bar')115    def test_int(self, env):116        class IntIsh(object):117            def __int__(self):118                return 42119        tmpl = env.from_string('{{ "42"|int }}|{{ "ajsghasjgd"|int }}|'120                               '{{ "32.32"|int }}|{{ "0x4d32"|int(0, 16) }}|'121                               '{{ "011"|int(0, 8)}}|{{ "0x33FU"|int(0, 16) }}|'122                               '{{ obj|int }}')123        out = tmpl.render(obj=IntIsh())124        assert out == '42|0|32|19762|9|0|42'125    def test_join(self, env):126        tmpl = env.from_string('{{ [1, 2, 3]|join("|") }}')127        out = tmpl.render()128        assert out == '1|2|3'129        env2 = Environment(autoescape=True)130        tmpl = env2.from_string(131            '{{ ["<foo>", "<span>foo</span>"|safe]|join }}')132        assert tmpl.render() == '<foo><span>foo</span>'133    def test_join_attribute(self, env):134        class User(object):135            def __init__(self, username):136                self.username = username137        tmpl = env.from_string('''{{ users|join(', ', 'username') }}''')138        assert tmpl.render(users=map(User, ['foo', 'bar'])) == 'foo, bar'139    def test_last(self, env):140        tmpl = env.from_string('''{{ foo|last }}''')141        out = tmpl.render(foo=list(range(10)))142        assert out == '9'143    def test_length(self, env):144        tmpl = env.from_string('''{{ "hello world"|length }}''')145        out = tmpl.render()146        assert out == '11'147    def test_lower(self, env):148        tmpl = env.from_string('''{{ "FOO"|lower }}''')149        out = tmpl.render()150        assert out == 'foo'151    def test_pprint(self, env):152        from pprint import pformat153        tmpl = env.from_string('''{{ data|pprint }}''')154        data = list(range(1000))155        assert tmpl.render(data=data) == pformat(data)156    def test_random(self, env):157        tmpl = env.from_string('''{{ seq|random }}''')158        seq = list(range(100))159        for _ in range(10):160            assert int(tmpl.render(seq=seq)) in seq161    def test_reverse(self, env):162        tmpl = env.from_string('{{ "foobar"|reverse|join }}|'163                               '{{ [1, 2, 3]|reverse|list }}')164        assert tmpl.render() == 'raboof|[3, 2, 1]'165    def test_string(self, env):166        x = [1, 2, 3, 4, 5]167        tmpl = env.from_string('''{{ obj|string }}''')168        assert tmpl.render(obj=x) == text_type(x)169    def test_title(self, env):170        tmpl = env.from_string('''{{ "foo bar"|title }}''')171        assert tmpl.render() == "Foo Bar"172        tmpl = env.from_string('''{{ "foo's bar"|title }}''')173        assert tmpl.render() == "Foo's Bar"174        tmpl = env.from_string('''{{ "foo   bar"|title }}''')175        assert tmpl.render() == "Foo   Bar"176        tmpl = env.from_string('''{{ "f bar f"|title }}''')177        assert tmpl.render() == "F Bar F"178        tmpl = env.from_string('''{{ "foo-bar"|title }}''')179        assert tmpl.render() == "Foo-Bar"180        tmpl = env.from_string('''{{ "foo\tbar"|title }}''')181        assert tmpl.render() == "Foo\tBar"182        tmpl = env.from_string('''{{ "FOO\tBAR"|title }}''')183        assert tmpl.render() == "Foo\tBar"184        tmpl = env.from_string('''{{ "foo (bar)"|title }}''')185        assert tmpl.render() == "Foo (Bar)"186        tmpl = env.from_string('''{{ "foo {bar}"|title }}''')187        assert tmpl.render() == "Foo {Bar}"188        tmpl = env.from_string('''{{ "foo [bar]"|title }}''')189        assert tmpl.render() == "Foo [Bar]"190        tmpl = env.from_string('''{{ "foo <bar>"|title }}''')191        assert tmpl.render() == "Foo <Bar>"192        class Foo:193            def __str__(self):194                return 'foo-bar'195        tmpl = env.from_string('''{{ data|title }}''')196        out = tmpl.render(data=Foo())197        assert out == 'Foo-Bar'198    def test_truncate(self, env):199        tmpl = env.from_string(200            '{{ data|truncate(15, true, ">>>") }}|'201            '{{ data|truncate(15, false, ">>>") }}|'202            '{{ smalldata|truncate(15) }}'203        )204        out = tmpl.render(data='foobar baz bar' * 1000,205                          smalldata='foobar baz bar')206        msg = 'Current output: %s' % out207        assert out == 'foobar baz b>>>|foobar baz>>>|foobar baz bar', msg208    def test_truncate_very_short(self, env):209        tmpl = env.from_string(210            '{{ "foo bar baz"|truncate(9) }}|'211            '{{ "foo bar baz"|truncate(9, true) }}'212        )213        out = tmpl.render()214        assert out == 'foo bar baz|foo bar baz', out215    def test_truncate_end_length(self, env):216        tmpl = env.from_string('{{ "Joel is a slug"|truncate(7, true) }}')217        out = tmpl.render()218        assert out == 'Joel...', 'Current output: %s' % out219    def test_upper(self, env):220        tmpl = env.from_string('{{ "foo"|upper }}')221        assert tmpl.render() == 'FOO'222    def test_urlize(self, env):223        tmpl = env.from_string(224            '{{ "foo http://www.example.com/ bar"|urlize }}')225        assert tmpl.render() == (226            'foo <a href="http://www.example.com/" rel="noopener">'227            'http://www.example.com/</a> bar'228        )229    def test_urlize_rel_policy(self):230        env = Environment()231        env.policies['urlize.rel'] = None232        tmpl = env.from_string(233            '{{ "foo http://www.example.com/ bar"|urlize }}')234        assert tmpl.render() == (235            'foo <a href="http://www.example.com/">'236            'http://www.example.com/</a> bar'237        )238    def test_urlize_target_parameter(self, env):239        tmpl = env.from_string(240            '{{ "foo http://www.example.com/ bar"|urlize(target="_blank") }}'241        )242        assert tmpl.render() \243            == 'foo <a href="http://www.example.com/" rel="noopener" target="_blank">'\244            'http://www.example.com/</a> bar'245    def test_wordcount(self, env):246        tmpl = env.from_string('{{ "foo bar baz"|wordcount }}')247        assert tmpl.render() == '3'248    def test_block(self, env):249        tmpl = env.from_string(250            '{% filter lower|escape %}<HEHE>{% endfilter %}'251        )252        assert tmpl.render() == '<hehe>'253    def test_chaining(self, env):254        tmpl = env.from_string(255            '''{{ ['<foo>', '<bar>']|first|upper|escape }}'''256        )257        assert tmpl.render() == '<FOO>'258    def test_sum(self, env):259        tmpl = env.from_string('''{{ [1, 2, 3, 4, 5, 6]|sum }}''')260        assert tmpl.render() == '21'261    def test_sum_attributes(self, env):262        tmpl = env.from_string('''{{ values|sum('value') }}''')263        assert tmpl.render(values=[264            {'value': 23},265            {'value': 1},266            {'value': 18},267        ]) == '42'268    def test_sum_attributes_nested(self, env):269        tmpl = env.from_string('''{{ values|sum('real.value') }}''')270        assert tmpl.render(values=[271            {'real': {'value': 23}},272            {'real': {'value': 1}},273            {'real': {'value': 18}},274        ]) == '42'275    def test_sum_attributes_tuple(self, env):276        tmpl = env.from_string('''{{ values.items()|sum('1') }}''')277        assert tmpl.render(values={278            'foo': 23,279            'bar': 1,280            'baz': 18,281        }) == '42'282    def test_abs(self, env):283        tmpl = env.from_string('''{{ -1|abs }}|{{ 1|abs }}''')284        assert tmpl.render() == '1|1', tmpl.render()285    def test_round_positive(self, env):286        tmpl = env.from_string('{{ 2.7|round }}|{{ 2.1|round }}|'287                               "{{ 2.1234|round(3, 'floor') }}|"288                               "{{ 2.1|round(0, 'ceil') }}")289        assert tmpl.render() == '3.0|2.0|2.123|3.0', tmpl.render()290    def test_round_negative(self, env):291        tmpl = env.from_string('{{ 21.3|round(-1)}}|'292                               "{{ 21.3|round(-1, 'ceil')}}|"293                               "{{ 21.3|round(-1, 'floor')}}")294        assert tmpl.render() == '20.0|30.0|20.0', tmpl.render()295    def test_xmlattr(self, env):296        tmpl = env.from_string(297            "{{ {'foo': 42, 'bar': 23, 'fish': none, "298            "'spam': missing, 'blub:blub': '<?>'}|xmlattr }}")299        out = tmpl.render().split()300        assert len(out) == 3301        assert 'foo="42"' in out302        assert 'bar="23"' in out303        assert 'blub:blub="<?>"' in out304    def test_sort1(self, env):305        tmpl = env.from_string(306            '{{ [2, 3, 1]|sort }}|{{ [2, 3, 1]|sort(true) }}')307        assert tmpl.render() == '[1, 2, 3]|[3, 2, 1]'308    def test_sort2(self, env):309        tmpl = env.from_string('{{ "".join(["c", "A", "b", "D"]|sort) }}')310        assert tmpl.render() == 'AbcD'311    def test_sort3(self, env):312        tmpl = env.from_string('''{{ ['foo', 'Bar', 'blah']|sort }}''')313        assert tmpl.render() == "['Bar', 'blah', 'foo']"314    def test_sort4(self, env):315        @implements_to_string316        class Magic(object):317            def __init__(self, value):318                self.value = value319            def __str__(self):320                return text_type(self.value)321        tmpl = env.from_string('''{{ items|sort(attribute='value')|join }}''')322        assert tmpl.render(items=map(Magic, [3, 2, 4, 1])) == '1234'323    def test_groupby(self, env):324        tmpl = env.from_string('''325        {%- for grouper, list in [{'foo': 1, 'bar': 2},326                                  {'foo': 2, 'bar': 3},327                                  {'foo': 1, 'bar': 1},328                                  {'foo': 3, 'bar': 4}]|groupby('foo') -%}329            {{ grouper }}{% for x in list %}: {{ x.foo }}, {{ x.bar }}{% endfor %}|330        {%- endfor %}''')331        assert tmpl.render().split('|') == [332            "1: 1, 2: 1, 1",333            "2: 2, 3",334            "3: 3, 4",335            ""336        ]337    def test_groupby_tuple_index(self, env):338        tmpl = env.from_string('''339        {%- for grouper, list in [('a', 1), ('a', 2), ('b', 1)]|groupby(0) -%}340            {{ grouper }}{% for x in list %}:{{ x.1 }}{% endfor %}|341        {%- endfor %}''')342        assert tmpl.render() == 'a:1:2|b:1|'343    def test_groupby_multidot(self, env):344        class Date(object):345            def __init__(self, day, month, year):346                self.day = day347                self.month = month348                self.year = year349        class Article(object):350            def __init__(self, title, *date):351                self.date = Date(*date)352                self.title = title353        articles = [354            Article('aha', 1, 1, 1970),355            Article('interesting', 2, 1, 1970),356            Article('really?', 3, 1, 1970),357            Article('totally not', 1, 1, 1971)358        ]359        tmpl = env.from_string('''360        {%- for year, list in articles|groupby('date.year') -%}361            {{ year }}{% for x in list %}[{{ x.title }}]{% endfor %}|362        {%- endfor %}''')363        assert tmpl.render(articles=articles).split('|') == [364            '1970[aha][interesting][really?]',365            '1971[totally not]',366            ''367        ]368    def test_filtertag(self, env):369        tmpl = env.from_string("{% filter upper|replace('FOO', 'foo') %}"370                               "foobar{% endfilter %}")371        assert tmpl.render() == 'fooBAR'372    def test_replace(self, env):373        env = Environment()374        tmpl = env.from_string('{{ string|replace("o", 42) }}')375        assert tmpl.render(string='<foo>') == '<f4242>'376        env = Environment(autoescape=True)377        tmpl = env.from_string('{{ string|replace("o", 42) }}')378        assert tmpl.render(string='<foo>') == '<f4242>'379        tmpl = env.from_string('{{ string|replace("<", 42) }}')380        assert tmpl.render(string='<foo>') == '42foo>'381        tmpl = env.from_string('{{ string|replace("o", ">x<") }}')382        assert tmpl.render(string=Markup('foo')) == 'f>x<>x<'383    def test_forceescape(self, env):384        tmpl = env.from_string('{{ x|forceescape }}')385        assert tmpl.render(x=Markup('<div />')) == u'<div />'386    def test_safe(self, env):387        env = Environment(autoescape=True)388        tmpl = env.from_string('{{ "<div>foo</div>"|safe }}')389        assert tmpl.render() == '<div>foo</div>'390        tmpl = env.from_string('{{ "<div>foo</div>" }}')391        assert tmpl.render() == '<div>foo</div>'392    def test_urlencode(self, env):393        env = Environment(autoescape=True)394        tmpl = env.from_string('{{ "Hello, world!"|urlencode }}')395        assert tmpl.render() == 'Hello%2C%20world%21'396        tmpl = env.from_string('{{ o|urlencode }}')397        assert tmpl.render(o=u"Hello, world\u203d") \398            == "Hello%2C%20world%E2%80%BD"399        assert tmpl.render(o=(("f", 1),)) == "f=1"400        assert tmpl.render(o=(('f', 1), ("z", 2))) == "f=1&z=2"401        assert tmpl.render(o=((u"\u203d", 1),)) == "%E2%80%BD=1"402        assert tmpl.render(o={u"\u203d": 1}) == "%E2%80%BD=1"403        assert tmpl.render(o={0: 1}) == "0=1"404    def test_simple_map(self, env):405        env = Environment()406        tmpl = env.from_string('{{ ["1", "2", "3"]|map("int")|sum }}')407        assert tmpl.render() == '6'408    def test_attribute_map(self, env):409        class User(object):410            def __init__(self, name):411                self.name = name412        env = Environment()413        users = [414            User('john'),415            User('jane'),416            User('mike'),417        ]418        tmpl = env.from_string('{{ users|map(attribute="name")|join("|") }}')419        assert tmpl.render(users=users) == 'john|jane|mike'420    def test_empty_map(self, env):421        env = Environment()422        tmpl = env.from_string('{{ none|map("upper")|list }}')423        assert tmpl.render() == '[]'424    def test_simple_select(self, env):425        env = Environment()426        tmpl = env.from_string('{{ [1, 2, 3, 4, 5]|select("odd")|join("|") }}')427        assert tmpl.render() == '1|3|5'428    def test_bool_select(self, env):429        env = Environment()430        tmpl = env.from_string(431            '{{ [none, false, 0, 1, 2, 3, 4, 5]|select|join("|") }}'432        )433        assert tmpl.render() == '1|2|3|4|5'434    def test_simple_reject(self, env):435        env = Environment()436        tmpl = env.from_string('{{ [1, 2, 3, 4, 5]|reject("odd")|join("|") }}')437        assert tmpl.render() == '2|4'438    def test_bool_reject(self, env):439        env = Environment()440        tmpl = env.from_string(441            '{{ [none, false, 0, 1, 2, 3, 4, 5]|reject|join("|") }}'442        )443        assert tmpl.render() == 'None|False|0'444    def test_simple_select_attr(self, env):445        class User(object):446            def __init__(self, name, is_active):447                self.name = name448                self.is_active = is_active449        env = Environment()450        users = [451            User('john', True),452            User('jane', True),453            User('mike', False),454        ]455        tmpl = env.from_string(456            '{{ users|selectattr("is_active")|'457            'map(attribute="name")|join("|") }}'458        )459        assert tmpl.render(users=users) == 'john|jane'460    def test_simple_reject_attr(self, env):461        class User(object):462            def __init__(self, name, is_active):463                self.name = name464                self.is_active = is_active465        env = Environment()466        users = [467            User('john', True),468            User('jane', True),469            User('mike', False),470        ]471        tmpl = env.from_string('{{ users|rejectattr("is_active")|'472                               'map(attribute="name")|join("|") }}')473        assert tmpl.render(users=users) == 'mike'474    def test_func_select_attr(self, env):475        class User(object):476            def __init__(self, id, name):477                self.id = id478                self.name = name479        env = Environment()480        users = [481            User(1, 'john'),482            User(2, 'jane'),483            User(3, 'mike'),484        ]485        tmpl = env.from_string('{{ users|selectattr("id", "odd")|'486                               'map(attribute="name")|join("|") }}')487        assert tmpl.render(users=users) == 'john|mike'488    def test_func_reject_attr(self, env):489        class User(object):490            def __init__(self, id, name):491                self.id = id492                self.name = name493        env = Environment()494        users = [495            User(1, 'john'),496            User(2, 'jane'),497            User(3, 'mike'),498        ]499        tmpl = env.from_string('{{ users|rejectattr("id", "odd")|'500                               'map(attribute="name")|join("|") }}')501        assert tmpl.render(users=users) == 'jane'502    def test_json_dump(self):503        env = Environment(autoescape=True)504        t = env.from_string('{{ x|tojson }}')505        assert t.render(x={'foo': 'bar'}) == '{"foo": "bar"}'506        assert t.render(x='"bar\'') == r'"\"bar\u0027"'507        def my_dumps(value, **options):508            assert options == {'foo': 'bar'}509            return '42'510        env.policies['json.dumps_function'] = my_dumps511        env.policies['json.dumps_kwargs'] = {'foo': 'bar'}...base_env.py
Source:base_env.py  
...68            }, ...69        }70    """71    @staticmethod72    def to_base_env(73            env: EnvType,74            make_env: Callable[[int], EnvType] = None,75            num_envs: int = 1,76            remote_envs: bool = False,77            remote_env_batch_wait_ms: int = 0,78            policy_config: PartialTrainerConfigDict = None,79    ) -> "BaseEnv":80        """Wraps any env type as needed to expose the async interface."""81        from ray.rllib.env.remote_vector_env import RemoteVectorEnv82        if remote_envs and num_envs == 1:83            raise ValueError(84                "Remote envs only make sense to use if num_envs > 1 "85                "(i.e. vectorization is enabled).")86        if not isinstance(env, BaseEnv):87            if isinstance(env, MultiAgentEnv):88                if remote_envs:89                    env = RemoteVectorEnv(90                        make_env,91                        num_envs,92                        multiagent=True,93                        remote_env_batch_wait_ms=remote_env_batch_wait_ms)94                else:95                    env = _MultiAgentEnvToBaseEnv(96                        make_env=make_env,97                        existing_envs=[env],98                        num_envs=num_envs)99            elif isinstance(env, ExternalEnv):100                if num_envs != 1:101                    raise ValueError(102                        "External(MultiAgent)Env does not currently support "103                        "num_envs > 1. One way of solving this would be to "104                        "treat your Env as a MultiAgentEnv hosting only one "105                        "type of agent but with several copies.")106                env = _ExternalEnvToBaseEnv(env)107            elif isinstance(env, VectorEnv):108                env = _VectorEnvToBaseEnv(env)109            else:110                if remote_envs:111                    env = RemoteVectorEnv(112                        make_env,113                        num_envs,114                        multiagent=False,115                        remote_env_batch_wait_ms=remote_env_batch_wait_ms,116                        existing_envs=[env],117                    )118                else:119                    env = VectorEnv.wrap(120                        make_env=make_env,121                        existing_envs=[env],122                        num_envs=num_envs,123                        action_space=env.action_space,124                        observation_space=env.observation_space,125                        policy_config=policy_config,126                    )127                    env = _VectorEnvToBaseEnv(env)128        assert isinstance(env, BaseEnv), env129        return env130    @PublicAPI131    def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,132                            MultiEnvDict, MultiEnvDict]:133        """Returns observations from ready agents.134        The returns are two-level dicts mapping from env_id to a dict of135        agent_id to values. The number of agents and envs can vary over time.136        Returns137        -------138            obs (dict): New observations for each ready agent.139            rewards (dict): Reward values for each ready agent. If the140                episode is just started, the value will be None.141            dones (dict): Done values for each ready agent. The special key142                "__all__" is used to indicate env termination.143            infos (dict): Info values for each ready agent.144            off_policy_actions (dict): Agents may take off-policy actions. When145                that happens, there will be an entry in this dict that contains146                the taken action. There is no need to send_actions() for agents147                that have already chosen off-policy actions.148        """149        raise NotImplementedError150    @PublicAPI151    def send_actions(self, action_dict: MultiEnvDict) -> None:152        """Called to send actions back to running agents in this env.153        Actions should be sent for each ready agent that returned observations154        in the previous poll() call.155        Args:156            action_dict (dict): Actions values keyed by env_id and agent_id.157        """158        raise NotImplementedError159    @PublicAPI160    def try_reset(self,161                  env_id: Optional[EnvID] = None) -> Optional[MultiAgentDict]:162        """Attempt to reset the sub-env with the given id or all sub-envs.163        If the environment does not support synchronous reset, None can be164        returned here.165        Args:166            env_id (Optional[int]): The sub-env ID if applicable. If None,167                reset the entire Env (i.e. all sub-envs).168        Returns:169            Optional[MultiAgentDict]: Resetted (multi-agent) observation dict170                or None if reset is not supported.171        """172        return None173    @PublicAPI174    def get_unwrapped(self) -> List[EnvType]:175        """Return a reference to the underlying gym envs, if any.176        Returns:177            envs (list): Underlying gym envs or [].178        """179        return []180    @PublicAPI181    def try_render(self, env_id: Optional[EnvID] = None) -> None:182        """Tries to render the environment.183        Args:184            env_id (Optional[int]): The sub-env ID if applicable. If None,185                renders the entire Env (i.e. all sub-envs).186        """187        # By default, do nothing.188        pass189    @PublicAPI190    def stop(self) -> None:191        """Releases all resources used."""192        for env in self.get_unwrapped():193            if hasattr(env, "close"):194                env.close()195# Fixed agent identifier when there is only the single agent in the env196_DUMMY_AGENT_ID = "agent0"197def _with_dummy_agent_id(env_id_to_values: Dict[EnvID, Any],198                         dummy_id: "AgentID" = _DUMMY_AGENT_ID199                         ) -> MultiEnvDict:200    return {k: {dummy_id: v} for (k, v) in env_id_to_values.items()}201class _ExternalEnvToBaseEnv(BaseEnv):202    """Internal adapter of ExternalEnv to BaseEnv."""203    def __init__(self,204                 external_env: ExternalEnv,205                 preprocessor: "Preprocessor" = None):206        self.external_env = external_env207        self.prep = preprocessor208        self.multiagent = issubclass(type(external_env), ExternalMultiAgentEnv)209        self.action_space = external_env.action_space210        if preprocessor:211            self.observation_space = preprocessor.observation_space212        else:213            self.observation_space = external_env.observation_space214        external_env.start()215    @override(BaseEnv)216    def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,217                            MultiEnvDict, MultiEnvDict]:218        with self.external_env._results_avail_condition:219            results = self._poll()220            while len(results[0]) == 0:221                self.external_env._results_avail_condition.wait()222                results = self._poll()223                if not self.external_env.isAlive():224                    raise Exception("Serving thread has stopped.")225        limit = self.external_env._max_concurrent_episodes226        assert len(results[0]) < limit, \227            ("Too many concurrent episodes, were some leaked? This "228             "ExternalEnv was created with max_concurrent={}".format(limit))229        return results230    @override(BaseEnv)231    def send_actions(self, action_dict: MultiEnvDict) -> None:232        if self.multiagent:233            for env_id, actions in action_dict.items():234                self.external_env._episodes[env_id].action_queue.put(actions)235        else:236            for env_id, action in action_dict.items():237                self.external_env._episodes[env_id].action_queue.put(238                    action[_DUMMY_AGENT_ID])239    def _poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,240                             MultiEnvDict, MultiEnvDict]:241        all_obs, all_rewards, all_dones, all_infos = {}, {}, {}, {}242        off_policy_actions = {}243        for eid, episode in self.external_env._episodes.copy().items():244            data = episode.get_data()245            cur_done = episode.cur_done_dict[246                "__all__"] if self.multiagent else episode.cur_done247            if cur_done:248                del self.external_env._episodes[eid]249            if data:250                if self.prep:251                    all_obs[eid] = self.prep.transform(data["obs"])252                else:253                    all_obs[eid] = data["obs"]254                all_rewards[eid] = data["reward"]255                all_dones[eid] = data["done"]256                all_infos[eid] = data["info"]257                if "off_policy_action" in data:258                    off_policy_actions[eid] = data["off_policy_action"]259        if self.multiagent:260            # Ensure a consistent set of keys261            # rely on all_obs having all possible keys for now.262            for eid, eid_dict in all_obs.items():263                for agent_id in eid_dict.keys():264                    def fix(d, zero_val):265                        if agent_id not in d[eid]:266                            d[eid][agent_id] = zero_val267                    fix(all_rewards, 0.0)268                    fix(all_dones, False)269                    fix(all_infos, {})270            return (all_obs, all_rewards, all_dones, all_infos,271                    off_policy_actions)272        else:273            return _with_dummy_agent_id(all_obs), \274                _with_dummy_agent_id(all_rewards), \275                _with_dummy_agent_id(all_dones, "__all__"), \276                _with_dummy_agent_id(all_infos), \277                _with_dummy_agent_id(off_policy_actions)278class _VectorEnvToBaseEnv(BaseEnv):279    """Internal adapter of VectorEnv to BaseEnv.280    We assume the caller will always send the full vector of actions in each281    call to send_actions(), and that they call reset_at() on all completed282    environments before calling send_actions().283    """284    def __init__(self, vector_env: VectorEnv):285        self.vector_env = vector_env286        self.action_space = vector_env.action_space287        self.observation_space = vector_env.observation_space288        self.num_envs = vector_env.num_envs289        self.new_obs = None  # lazily initialized290        self.cur_rewards = [None for _ in range(self.num_envs)]291        self.cur_dones = [False for _ in range(self.num_envs)]292        self.cur_infos = [None for _ in range(self.num_envs)]293    @override(BaseEnv)294    def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,295                            MultiEnvDict, MultiEnvDict]:296        if self.new_obs is None:297            self.new_obs = self.vector_env.vector_reset()298        new_obs = dict(enumerate(self.new_obs))299        rewards = dict(enumerate(self.cur_rewards))300        dones = dict(enumerate(self.cur_dones))301        infos = dict(enumerate(self.cur_infos))302        self.new_obs = []303        self.cur_rewards = []304        self.cur_dones = []305        self.cur_infos = []306        return _with_dummy_agent_id(new_obs), \307            _with_dummy_agent_id(rewards), \308            _with_dummy_agent_id(dones, "__all__"), \309            _with_dummy_agent_id(infos), {}310    @override(BaseEnv)311    def send_actions(self, action_dict: MultiEnvDict) -> None:312        action_vector = [None] * self.num_envs313        for i in range(self.num_envs):314            action_vector[i] = action_dict[i][_DUMMY_AGENT_ID]315        self.new_obs, self.cur_rewards, self.cur_dones, self.cur_infos = \316            self.vector_env.vector_step(action_vector)317    @override(BaseEnv)318    def try_reset(self, env_id: Optional[EnvID] = None) -> MultiAgentDict:319        assert env_id is None or isinstance(env_id, int)320        return {_DUMMY_AGENT_ID: self.vector_env.reset_at(env_id)}321    @override(BaseEnv)322    def get_unwrapped(self) -> List[EnvType]:323        return self.vector_env.get_unwrapped()324    @override(BaseEnv)325    def try_render(self, env_id: Optional[EnvID] = None) -> None:326        assert env_id is None or isinstance(env_id, int)327        return self.vector_env.try_render_at(env_id)328class _MultiAgentEnvToBaseEnv(BaseEnv):329    """Internal adapter of MultiAgentEnv to BaseEnv.330    This also supports vectorization if num_envs > 1.331    """332    def __init__(self, make_env: Callable[[int], EnvType],333                 existing_envs: List[MultiAgentEnv], num_envs: int):334        """Wrap existing multi-agent envs.335        Args:336            make_env (func|None): Factory that produces a new multiagent env.337                Must be defined if the number of existing envs is less than338                num_envs.339            existing_envs (list): List of existing multiagent envs.340            num_envs (int): Desired num multiagent envs to keep total.341        """342        self.make_env = make_env343        self.envs = existing_envs344        self.num_envs = num_envs345        self.dones = set()346        while len(self.envs) < self.num_envs:347            self.envs.append(self.make_env(len(self.envs)))348        for env in self.envs:349            assert isinstance(env, MultiAgentEnv)350        self.env_states = [_MultiAgentEnvState(env) for env in self.envs]351    @override(BaseEnv)352    def poll(self) -> Tuple[MultiEnvDict, MultiEnvDict, MultiEnvDict,353                            MultiEnvDict, MultiEnvDict]:354        obs, rewards, dones, infos = {}, {}, {}, {}355        for i, env_state in enumerate(self.env_states):356            obs[i], rewards[i], dones[i], infos[i] = env_state.poll()357        return obs, rewards, dones, infos, {}358    @override(BaseEnv)359    def send_actions(self, action_dict: MultiEnvDict) -> None:360        for env_id, agent_dict in action_dict.items():361            if env_id in self.dones:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
