Best Python code snippet using dbt-osmosis_python
ddl.py
Source:ddl.py  
1#!/usr/bin/env python2# -*- coding: utf-8; tab-width: 4; indent-tabs-mode: t -*-3#4# NetProfile: Custom DDL constructs for SQLAlchemy5# © Copyright 2013-2014 Alex 'Unik' Unigovsky6#7# This file is part of NetProfile.8# NetProfile is free software: you can redistribute it and/or9# modify it under the terms of the GNU Affero General Public10# License as published by the Free Software Foundation, either11# version 3 of the License, or (at your option) any later12# version.13#14# NetProfile is distributed in the hope that it will be useful,15# but WITHOUT ANY WARRANTY; without even the implied warranty of16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the17# GNU Affero General Public License for more details.18#19# You should have received a copy of the GNU Affero General20# Public License along with NetProfile. If not, see21# <http://www.gnu.org/licenses/>.22from __future__ import (23	unicode_literals,24	print_function,25	absolute_import,26	division27)28import sys29import datetime as dt30from sqlalchemy.schema import (31	Column,32	DefaultClause,33	DDLElement,34	SchemaItem,35	Table36)37from sqlalchemy import (38	event,39	text40)41from sqlalchemy.sql import sqltypes42from sqlalchemy.sql.expression import ClauseElement43from sqlalchemy.sql.functions import FunctionElement44from sqlalchemy.ext.compiler import compiles45from sqlalchemy.ext.declarative import DeclarativeMeta46from sqlalchemy.orm import Query47from pyramid.renderers import render48from netprofile.ext.data import _table_to_class49from .connection import Base50class CurrentTimestampDefaultItem(ClauseElement):51	def __init__(self, on_update=False):52		self.on_update = on_update53@compiles(CurrentTimestampDefaultItem, 'mysql')54def visit_timestamp_default_mysql(element, compiler, **kw):55	ddl = 'CURRENT_TIMESTAMP'56	if element.on_update:57		ddl += ' ON UPDATE CURRENT_TIMESTAMP'58	return ddl59@compiles(CurrentTimestampDefaultItem)60def visit_timestamp_default(element, compiler, **kw):61	return 'CURRENT_TIMESTAMP'62class CurrentTimestampDefault(DefaultClause):63	def __init__(self, on_update=False):64		self.on_update = on_update65		super(CurrentTimestampDefault, self).__init__(66			CurrentTimestampDefaultItem(on_update),67			for_update=on_update68		)69	def _set_parent(self, column):70		self.column = column71		self.column.server_default = self72		if self.on_update:73			self.column.server_onupdate = self74class SQLFunctionArgument(DDLElement):75	def __init__(self, name, arg_type, arg_dir=None):76		self.name = name77		self.type = arg_type78		self.dir = arg_dir79class InArgument(SQLFunctionArgument):80	def __init__(self, name, arg_type):81		super(InArgument, self).__init__(name, arg_type, 'IN')82class OutArgument(SQLFunctionArgument):83	def __init__(self, name, arg_type):84		super(OutArgument, self).__init__(name, arg_type, 'OUT')85class InOutArgument(SQLFunctionArgument):86	def __init__(self, name, arg_type):87		super(InOutArgument, self).__init__(name, arg_type, 'INOUT')88@compiles(SQLFunctionArgument, 'mysql')89def visit_sql_function_arg(element, compiler, **kw):90	return '%s %s %s' % (91		element.dir if element.dir else '',92		compiler.sql_compiler.preparer.quote(element.name),93		compiler.dialect.type_compiler.process(element.type)94	)95class AlterTableAlterColumn(DDLElement):96	def __init__(self, table, column):97		self.table = table98		self.column = column99@compiles(AlterTableAlterColumn, 'mysql')100def visit_alter_table_alter_column_mysql(element, compiler, **kw):101	table = element.table102	col = element.column103	spec = compiler.get_column_specification(col, first_pk=col.primary_key)104	const = " ".join(compiler.process(constraint) \105			for constraint in col.constraints)106	if const:107		spec += " " + const108	if col.comment:109		spec += " COMMENT " + compiler.sql_compiler.render_literal_value(col.comment, sqltypes.STRINGTYPE)110	return 'ALTER TABLE %s CHANGE COLUMN %s %s' % (111		compiler.sql_compiler.preparer.format_table(table),112		compiler.sql_compiler.preparer.format_column(col),113		spec114	)115class SetTableComment(DDLElement):116	def __init__(self, table, comment):117		self.table = table118		self.text = comment119class SetColumnComment(DDLElement):120	def __init__(self, column, comment):121		self.column = column122		self.text = comment123class Comment(SchemaItem):124	"""125	Represents a table comment DDL.126	"""127	def __init__(self, ctext):128		self.text = ctext129	def _set_parent(self, parent):130		self.parent = parent131		parent.comment = self.text132		if isinstance(parent, Table):133			SetTableComment(parent, self.text).execute_at('after_create', parent)134		elif isinstance(parent, Column):135			text = self.text136			if not parent.doc:137				parent.doc = text138			def _set_col_comment(column, meta):139				SetColumnComment(column, text).execute_at('after_create', column.table)140			event.listen(141				parent,142				'after_parent_attach',143				_set_col_comment144			)145@compiles(SetColumnComment, 'mysql')146def visit_set_column_comment_mysql(element, compiler, **kw):147	spec = compiler.get_column_specification(element.column, first_pk=element.column.primary_key)148	const = " ".join(compiler.process(constraint) \149			for constraint in element.column.constraints)150	if const:151		spec += " " + const152	return 'ALTER TABLE %s MODIFY COLUMN %s COMMENT %s' % (153		compiler.sql_compiler.preparer.format_table(element.column.table),154		spec,155		compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)156	)157@compiles(SetColumnComment, 'postgresql')158@compiles(SetColumnComment, 'oracle')159def visit_set_column_comment_pgsql(element, compiler, **kw):160	return 'COMMENT ON COLUMN %s IS %s' % (161		compiler.sql_compiler.process(element.column),162		compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)163	)164@compiles(SetColumnComment)165def visit_set_column_comment(element, compiler, **kw):166	pass167@compiles(SetTableComment, 'mysql')168def visit_set_table_comment_mysql(element, compiler, **kw):169	return 'ALTER TABLE %s COMMENT=%s' % (170		compiler.sql_compiler.preparer.format_table(element.table),171		compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)172	)173@compiles(SetTableComment, 'postgresql')174@compiles(SetTableComment, 'oracle')175def visit_set_table_comment_pgsql(element, compiler, **kw):176	return 'COMMENT ON TABLE %s IS %s' % (177		compiler.sql_compiler.preparer.format_table(element.table),178		compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)179	)180@compiles(SetTableComment)181def visit_set_table_comment(element, compiler, **kw):182	pass183def ddl_fmt(ctx, obj):184	compiler = ctx['compiler']185	if isinstance(obj, (Trigger, SQLFunction, SQLEvent)):186		return compiler.sql_compiler.preparer.quote(obj.name)187	if isinstance(obj, Table):188		return compiler.sql_compiler.preparer.format_table(obj)189	if isinstance(obj, DeclarativeMeta):190		return compiler.sql_compiler.preparer.format_table(obj.__table__)191	if isinstance(obj, DDLElement):192		return compiler.process(obj)193	if isinstance(obj, sqltypes.TypeEngine):194		return compiler.dialect.type_compiler.process(obj)195	if isinstance(obj, (FunctionElement, ClauseElement)):196		return compiler.sql_compiler.process(obj)197	if isinstance(obj, dt.datetime):198		dname = compiler.dialect.name199		date = obj200		if dname in ('mysql', 'postgresql', 'sqlite'):201			date = date.strftime('%Y-%m-%d %H:%M:%S')202		return compiler.sql_compiler.render_literal_value(date, sqltypes.STRINGTYPE)203	if isinstance(obj, str):204		return compiler.sql_compiler.render_literal_value(obj, sqltypes.STRINGTYPE)205	if isinstance(obj, int):206		return compiler.sql_compiler.render_literal_value(obj, sqltypes.INTEGERTYPE)207	raise ValueError('Unable to format value for DDL')208class CreateTrigger(DDLElement):209	def __init__(self, table, trigger):210		self.table = table211		self.trigger = trigger212class DropTrigger(DDLElement):213	def __init__(self, table, trigger):214		self.table = table215		self.trigger = trigger216@compiles(CreateTrigger)217def visit_create_trigger(element, compiler, **kw):218	table = element.table219	cls = _table_to_class(table.name)220	module = cls.__module__.split('.')[0]221	trigger = element.trigger222	tpldef = {223		'table'    : table,224		'class'    : cls,225		'module'   : module,226		'compiler' : compiler,227		'dialect'  : compiler.dialect,228		'trigger'  : trigger,229		'raw'      : text230	}231	tpldef.update(Base._decl_class_registry.items())232	tplname = '%s:templates/sql/%s/triggers/%s.mak' % (233		module,234		compiler.dialect.name,235		trigger.name236	)237	return render(tplname, tpldef, package=sys.modules[module])238@compiles(DropTrigger, 'postgresql')239def visit_drop_trigger_mysql(element, compiler, **kw):240	return 'DROP TRIGGER %s ON %s' % (241		compiler.sql_compiler.preparer.quote(element.trigger.name),242		compiler.sql_compiler.preparer.format_table(element.parent)243	)244@compiles(DropTrigger)245def visit_drop_trigger(element, compiler, **kw):246	return 'DROP TRIGGER %s' % (247		compiler.sql_compiler.preparer.quote(element.trigger.name),248	)249class Trigger(SchemaItem):250	"""251	Schema element that attaches a trigger template to an object.252	"""253	def __init__(self, when='before', event='insert', name=None):254		self.when = when255		self.event = event256		self.name = name257	def _set_parent(self, parent):258		if isinstance(parent, Table):259			self.parent = parent260			CreateTrigger(parent, self).execute_at('after_create', parent)261			DropTrigger(parent, self).execute_at('before_drop', parent)262class CreateFunction(DDLElement):263	"""264	SQL function template DDL object.265	"""266	def __init__(self, func, module):267		self.func = func268		self.module = module269@compiles(CreateFunction)270def visit_create_function(element, compiler, **kw):271	func = element.func272	name = func.name273	module = 'netprofile_' + element.module274	tpldef = {275		'function' : func,276		'name'     : name,277		'module'   : module,278		'compiler' : compiler,279		'dialect'  : compiler.dialect,280		'raw'      : text281	}282	tpldef.update(Base._decl_class_registry.items())283	tplname = '%s:templates/sql/%s/functions/%s.mak' % (284		module,285		compiler.dialect.name,286		name287	)288	return render(tplname, tpldef, package=sys.modules[module])289class DropFunction(DDLElement):290	"""291	SQL DROP FUNCTION DDL object.292	"""293	def __init__(self, func):294		self.func = func295@compiles(DropFunction, 'postgresql')296def visit_drop_function_pgsql(element, compiler, **kw):297	func = element.func298	name = func.name299	return 'DROP FUNCTION %s' % (300		compiler.sql_compiler.preparer.quote(name),301	)302@compiles(DropFunction)303def visit_drop_function(element, compiler, **kw):304	func = element.func305	name = func.name306	is_proc = func.is_procedure307	return 'DROP %s %s' % (308		'PROCEDURE' if is_proc else 'FUNCTION',309		compiler.sql_compiler.preparer.quote(name)310	)311class SQLFunction(object):312	"""313	Schema element that defines an SQL function or procedure.314	"""315	def __init__(self, name, args=(), returns=None, comment=None, reads_sql=True, writes_sql=True, is_procedure=False, label=None):316		self.name = name317		self.args = args318		self.returns = returns319		self.comment = comment320		self.reads_sql = reads_sql321		self.writes_sql = writes_sql322		self.is_procedure = is_procedure323		self.label = label324	def create(self, modname):325		return CreateFunction(self, modname)326	def drop(self):327		return DropFunction(self)328class CreateEvent(DDLElement):329	"""330	SQL event template DDL object.331	"""332	def __init__(self, evt, module):333		self.event = evt334		self.module = module335@compiles(CreateEvent, 'mysql')336def visit_create_event_mysql(element, compiler, **kw):337	evt = element.event338	name = evt.name339	module = 'netprofile_' + element.module340	tpldef = {341		'event'    : evt,342		'name'     : name,343		'module'   : module,344		'compiler' : compiler,345		'dialect'  : compiler.dialect,346		'raw'      : text347	}348	tpldef.update(Base._decl_class_registry.items())349	tplname = '%s:templates/sql/%s/events/%s.mak' % (350		module,351		compiler.dialect.name,352		name353	)354	return render(tplname, tpldef, package=sys.modules[module])355class DropEvent(DDLElement):356	"""357	SQL DROP EVENT DDL object.358	"""359	def __init__(self, evt):360		self.event = evt361@compiles(DropEvent, 'mysql')362def visit_drop_event_mysql(element, compiler, **kw):363	evt = element.event364	return 'DROP EVENT %s' % (365		compiler.sql_compiler.preparer.quote(evt.name),366	)367class SQLEvent(object):368	"""369	Schema element that defines some periodically executed SQL code.370	"""371	def __init__(self, name, sched_unit='month', sched_interval=1, starts=None, preserve=True, enabled=True, comment=None):372		self.name = name373		self.preserve = preserve374		self.enabled = enabled375		self.comment = comment376		self.sched_unit = sched_unit377		self.sched_interval = sched_interval378		self.starts = starts379	def create(self, modname):380		return CreateEvent(self, modname)381	def drop(self):382		return DropEvent(self)383class CreateView(DDLElement):384	"""385	SQL create view DDL object.386	"""387	def __init__(self, name, select, check_option=None):388		self.name = name389		self.select = select390		self.check = check_option391class DropView(DDLElement):392	"""393	SQL drop view DDL object.394	"""395	def __init__(self, name):396		self.name = name397@compiles(CreateView)398def visit_create_view(element, compiler, **kw):399	sel = element.select400	co = ''401	if callable(sel):402		sel = sel()403	if isinstance(sel, Query):404		ctx = sel._compile_context()405		ctx.statement.use_labels = True406		conn = sel.session.connection(mapper=sel._mapper_zero_or_none(), clause=ctx.statement)407		sel = ctx.statement.compile(conn, compile_kwargs={ 'literal_binds': True })408	else:409		sel = compiler.sql_compiler.process(sel, literal_binds=True)410	if element.check:411		if isinstance(element.check, str):412			co = ' WITH %s CHECK OPTION' % (element.check.upper(),)413		else:414			co = ' WITH CHECK OPTION'415	return 'CREATE VIEW %s AS %s%s' % (416		compiler.sql_compiler.preparer.quote(element.name),417		sel, co418	)419@compiles(DropView)420def visit_drop_view(element, compiler, **kw):421	return 'DROP VIEW %s' % (422		compiler.sql_compiler.preparer.quote(element.name),423	)424class View(SchemaItem):425	"""426	Schema element that attaches a view with a predefined query to an object.427	"""428	def __init__(self, name, select, check_option=None):429		self.name = name430		self.select = select431		self.check = check_option432	def _set_parent(self, parent):433		if isinstance(parent, Table):434			self.parent = parent435			CreateView(self.name, self.select, check_option=self.check).execute_at('after_create', parent)436			DropView(self.name).execute_at('before_drop', parent)437	def create(self):438		return CreateView(self.name, self.select, check_option=self.check)439	def drop(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
