...140 self.session.rollback()141 database.drop_all()142 database.stop_engine()143 # Returns the last value of a generator and the log entries it yielded. Asserts that the first value is a string, and all other (except maybe the last one) are of type LogEntry.144 def _unwind(self, generator, assert_no_error=False):145 step_description = self.assertTrue(isinstance(step_description, str) or isinstance(step_description, unicode))147 entries = []148 out = None149 entry = None150 try:151 while True:152 entry = if not(isinstance(entry, m.LogEntry)):154 with self.assertRaises(StopIteration):155 out = else:157 entries.append(entry)158 if assert_no_error:159 self.assertNotEquals(entry.severity, m.Severity.ERROR.format())160 except StopIteration:161 out = entry162 return out, entries163 def test_load_configuration(self):164 view = self.session.query(m.DeploymentView).get(1)165 self._unwind(execution.check_configuration(view), assert_no_error=True)166 def test_load_configuration_deactivated_servers(self):167 view = self.session.query(m.DeploymentView).get(2)168 _, entries = self._unwind(execution.check_configuration(view))169 self.assertTrue(any(entry.severity == m.Severity.ERROR.format() for entry in entries))170 # This is a Friday171 @freeze_time('2015-11-28 15:00')172 def test_check_deploy_allowed_friday_afternoon(self):173 authorized, _ = self._unwind(execution.check_deploy_allowed(self.deploy_user, 1, 'prod'))174 self.assertFalse(authorized)175 # This is a Sunday176 @freeze_time('2015-11-28')177 def test_check_deploy_allowed_super_user(self):178 authorized, _ = self._unwind(execution.check_deploy_allowed(self.admin_user, 1, 'prod'))179 self.assertTrue(authorized)180 # This is a Sunday181 @freeze_time('2015-11-28')182 def test_check_deploy_allowed_weekend(self):183 authorized, _ = self._unwind(execution.check_deploy_allowed(self.deploy_user, 1, 'prod'))184 self.assertFalse(authorized)185 # This is a Wednesday186 @freeze_time('2015-11-25 23:00')187 def test_check_deploy_allowed_late(self):188 authorized, _ = self._unwind(execution.check_deploy_allowed(self.deploy_user, 1, 'prod'))189 self.assertFalse(authorized)190 # This is Christmas! Yay!191 @freeze_time('2017-12-25 10:00')192 def test_check_deploy_allowed_bank_holiday(self):193 authorized, _ = self._unwind(execution.check_deploy_allowed(self.deploy_user, 1, 'prod'))194 self.assertFalse(authorized)195 @mock.patch('git.Repo.clone_from')196 def test_clone_repo(self, mock_func):197 self._unwind(execution.clone_repo('/tmp/deploy/my_repo_branch', 'my_repo', 'git'))198 mock_func.assert_called_with('git@git:my_repo', '/tmp/deploy/my_repo_branch')199 def test_run_predeploy(self):200 try:201 repo_path = tempfile.mkdtemp()202 with open(os.path.join(repo_path, ''), 'w') as f:203 f.write('echo -n "it works for env $1 commit $2"')204 f.flush()205 _, entries = self._unwind(execution.run_and_delete_predeploy(repo_path, "dev", "abcde"))206 self.assertEqual(2, len(entries))207 entry = entries[0]208 self.assertEqual(" it works for env dev commit abcde", entry.message)209 finally:210 shutil.rmtree(repo_path)211 @mock.patch('deployment.execution.haproxy_action')212 def test_enable_clusters(self, mock_func):213 servers_1 = [(m.Server(id=1, name='fr-hq-server-01'), "BACKEND,01"), (m.Server(id=2, name='fr-hq-server-02'), "BACKEND,02")]214 servers_2 = [(m.Server(id=3, name='fr-hq-server-03'), "BACKEND,03"), (m.Server(id=4, name='fr-hq-server-04'), "BACKEND,04")]215 cluster_1 = m.Cluster(id=1, name="1", haproxy_host="fr-hq-vip-01")216 cluster_2 = m.Cluster(id=2, name="2", haproxy_host="fr-hq-vip-02")217 asso_1 = [m.ClusterServerAssociation(server_def=server, cluster_def=cluster_1, haproxy_key=haproxy_key)218 for server, haproxy_key in servers_1]219 asso_2 = [m.ClusterServerAssociation(server_def=server, cluster_def=cluster_2, haproxy_key=haproxy_key)220 for server, haproxy_key in servers_2]221 clusters = [cluster_1, cluster_2]222 self._unwind(execution.enable_clusters(clusters, "secret"))223 mock_func.assert_has_calls([224"fr-hq-vip-01", ["BACKEND,01", "BACKEND,02"], "secret", '', execution.HAProxyAction.ENABLE),225"fr-hq-vip-02", ["BACKEND,03", "BACKEND,04"], "secret", '', execution.HAProxyAction.ENABLE)226 ])227 @mock.patch('deployment.execution.haproxy_action')228 def test_disable_clusters(self, mock_func):229 servers_1 = [(m.Server(id=1, name='fr-hq-server-01'), "BACKEND,01"), (m.Server(id=2, name='fr-hq-server-02'), "BACKEND,02")]230 servers_2 = [(m.Server(id=3, name='fr-hq-server-03'), "BACKEND,03"), (m.Server(id=4, name='fr-hq-server-04'), "BACKEND,04")]231 cluster_1 = m.Cluster(id=1, name="1", haproxy_host="fr-hq-vip-01")232 cluster_2 = m.Cluster(id=2, name="2", haproxy_host="fr-hq-vip-02")233 asso_1 = [m.ClusterServerAssociation(server_def=server, cluster_def=cluster_1, haproxy_key=haproxy_key)234 for server, haproxy_key in servers_1]235 asso_2 = [m.ClusterServerAssociation(server_def=server, cluster_def=cluster_2, haproxy_key=haproxy_key)236 for server, haproxy_key in servers_2]237 clusters = [cluster_1, cluster_2]238 self._unwind(execution.disable_clusters(clusters, "secret"))239 mock_func.assert_has_calls([240"fr-hq-vip-01", ["BACKEND,01", "BACKEND,02"], "secret", '', execution.HAProxyAction.DISABLE),241"fr-hq-vip-02", ["BACKEND,03", "BACKEND,04"], "secret", '', execution.HAProxyAction.DISABLE)242 ])243 @mock.patch('deployment.execution.haproxy_action')244 def test_ensure_clusters_up(self, mock_func):245 servers_1 = [(m.Server(id=1, name='fr-hq-server-01'), "BACKEND,01"), (m.Server(id=2, name='fr-hq-server-02'), "BACKEND,02")]246 servers_2 = [(m.Server(id=3, name='fr-hq-server-03'), "BACKEND,03"), (m.Server(id=4, name='fr-hq-server-04'), "BACKEND,04")]247 cluster_1 = m.Cluster(id=1, name="1", haproxy_host="fr-hq-vip-01")248 cluster_2 = m.Cluster(id=2, name="2", haproxy_host="fr-hq-vip-02")249 asso_1 = [m.ClusterServerAssociation(server_def=server, cluster_def=cluster_1, haproxy_key=haproxy_key)250 for server, haproxy_key in servers_1]251 asso_2 = [m.ClusterServerAssociation(server_def=server, cluster_def=cluster_2, haproxy_key=haproxy_key)252 for server, haproxy_key in servers_2]253 clusters = [cluster_1, cluster_2]254 self._unwind(execution.ensure_clusters_up(clusters, "secret"))255 mock_func.assert_has_calls([256"fr-hq-vip-01", ["BACKEND,01", "BACKEND,02"], "secret", 'UP', execution.HAProxyAction.ENABLE),257"fr-hq-vip-02", ["BACKEND,03", "BACKEND,04"], "secret", 'UP', execution.HAProxyAction.ENABLE)258 ])259 @freeze_time('2015-11-25 23:00')260 @mock.patch('deployment.executils.exec_cmd', autospec=True)261 @mock.patch('deployment.execution.exec_cmd', autospec=True)262 def test_sync(self, mock_func, mock_func_2):263 mock_func.side_effect = lambda *args, **kwargs: (0, "stdout", "stderr")264 mock_func_2.side_effect = lambda *args, **kwargs: (0, "stdout", "stderr")265 host = executils.Host("fr-hq-deployment-01", "scaleweb", 22)266 self._unwind(execution.parallel_sync("/home/scaleweb/project", "-cr --delete-after", "master", "abcde", "/home/deploy/project/", [host], 1))267 mock_func.assert_has_calls([268["rsync", "-e", "ssh -p 22", "--exclude=.git", "-cr", "--delete-after", "--exclude=.git_release", "/home/deploy/project/", "scaleweb@fr-hq-deployment-01:/home/scaleweb/project/"]),269 ])270 mock_func_2.assert_has_calls([271['ssh', 'scaleweb@fr-hq-deployment-01', '-p', '22', 'mkdir', '-p', "/home/scaleweb/project/"], timeout=600),272['ssh', 'scaleweb@fr-hq-deployment-01', '-p', '22', 'echo', "'master\nabcde\n2015-11-25T23:00:00.000000\n/home/scaleweb/project/'", '>', '/home/scaleweb/project/.git_release'], timeout=600)273 ], any_order=True) # TODO: investiguate the extra calls without parameters274 @mock.patch('deployment.executils.exec_cmd', autospec=True)275 def test_release_inplace(self, mock_func):276 host = executils.Host("some-server", "scaleweb", 22)277 self._unwind(execution.release(host, "inplace", "/home/scaleweb", "/home/scaleweb/", "project"))278 self.assertItemsEqual(mock_func.call_args_list, [])279 @mock.patch('deployment.executils.exec_cmd', autospec=True)280 def test_release_symlink(self, mock_func):281 mock_func.side_effect = lambda *args, **kwargs: (0, "stdout", "stderr")282 host = executils.Host("fr-hq-deployment-01", "scaleweb", 22)283 self._unwind(execution.release(host, "symlink", "/home/scaleweb/", "production", "/home/scaleweb/production_releases/20151204_prod_abcde/"))284 mock_func.assert_called_with(['ssh', 'scaleweb@fr-hq-deployment-01', '-p', '22', 'cd', '/home/scaleweb/', '&&', 'ln', '-s', "/home/scaleweb/production_releases/20151204_prod_abcde/", 'tmp-link', '&&', 'mv', '-T', 'tmp-link', "/home/scaleweb/production"], timeout=600)285 @mock.patch('deployment.execution.exec_script_remote', autospec=True)286 @mock.patch('deployment.execution.run_cmd_by_ssh', autospec=True)287 def test_run_deploy(self, mock_script_func, mock_ssh_func):288 mock_script_func.side_effect = lambda *args, **kwargs: (0, "stdout", "stderr")289 mock_ssh_func.side_effect = lambda *args, **kwargs: (0, "stdout", "stderr")290 # Just run the method to catch obvious mistakes291 # It's too complex to write a robust non-trivial test against this (very simple) method.292 host = executils.Host("some-server", "scaleweb", 22)293 self._unwind(execution.run_and_delete_deploy(host, '/home/scaleweb/project', 'dev', 'abcde'), assert_no_error=True)294 @mock.patch('deployment.executils.exec_cmd', autospec=True)295 def test_run_tests(self, mock_func):296 mock_func.side_effect = lambda *args, **kwargs: (0, "ok", "still ok")297 env = self.session.query(m.Environment).get(2)298 host = executils.Host.from_server(env.servers[0], "scaleweb")299 # Remote300 report = execution.run_test(env, "master", "abcde", host=host,301 mail_sender="", local=False)302 self.assertEquals(False, report.failed)303 # Local304 report = execution.run_test(env, "master", "abcde", host=host,305 mail_sender="", local=True,306 local_repo_path="/home/deploy/project")307 self.assertIsNone(report)308 def test_check_servers_availability(self):309 servers = [self.session.query(m.Server).get(1)]310 ok, entries = self._unwind(execution.check_servers_availability(self.session, 2, servers, "prod", "prod", "abcde"))311 self.assertFalse(ok)312 servers = [self.session.query(m.Server).get(4)]313 ok, entries = self._unwind(execution.check_servers_availability(self.session, 2, servers, "prod", "prod", "abcde"))314 self.assertTrue(ok)315class TestSeverity(unittest.TestCase):316 def test_severity_format(self):317 self.assertEqual("info", execution.Severity.INFO.format())318 self.assertEqual("warn", execution.Severity.WARN.format())319 self.assertEqual("error", execution.Severity.ERROR.format())320 def test_severity_from_string(self):321 self.assertEqual(execution.Severity.INFO, execution.Severity.from_string("info"))322 self.assertEqual(execution.Severity.WARN, execution.Severity.from_string("warn"))...

1"""2D rendering contexts"""2from xml.sax.saxutils import escape, quoteattr3__all__ = "SVG TikZ".split()4class _ContextBase(object):5 def __init__(self):6 # Font size in points7 self.font_size = 108 self._unwind = []9 self._save = ['font_size', '_unwind']10 def __enter__(self):11 state = {field: getattr(self, field) for field in self._save}12 def restore():13 for k, v in state.iteritems():14 setattr(self, k, v)15 self._unwind = [restore]16 def __exit__(self, *args):17 for unwind in reversed(self._unwind):18 unwind()19 def translate(self, x, y):20 raise NotImplementedError21 def rect(self, x, y, w, h, fill=None, stroke=None, stroke_width=None):22 raise NotImplementedError23 def circle(self, x, y, r, fill=None, stroke=None, stroke_width=None):24 raise NotImplementedError25 def path(self, points, fill=None, stroke=None, stroke_width=None):26 raise NotImplementedError27 def clip(self, points):28 raise NotImplementedError29 def _canonPath(self, points):30 o = []31 lx, ly = 0, 032 for pt in points:33 if pt == 'Z':34 o.append('Z')35 elif pt[0] == 'M':36 o.append(pt)37 lx, ly = pt[1:]38 else:39 if pt[0] == 'L':40 px, py = pt[1:]41 elif pt[0] == 'l':42 px, py = lx + pt[1], ly + pt[2]43 elif pt[0] == 'H':44 px, py = pt[1], ly45 elif pt[0] == 'V':46 px, py = lx, pt[1]47 if py == ly and px == lx:48 continue49 o.append(('L', px, py))50 lx, ly = px, py51 return o52 def pathBounds(self, points):53 t, r, b, l = (None,)*454 for pt in self._canonPath(points):55 if pt == 'Z':56 pass57 elif t is None:58 r, t, l, b = pt[1:] + pt[1:]59 else:60 x, y = pt[1:]61 t, b = min(t, y), max(b, y)62 l, r = min(l, x), max(r, x)63 if t is None:64 return None65 return t, r, b, l66class SVG(_ContextBase):67 def __init__(self, **attrs):68 super(SVG, self).__init__()69 self.__gattrs = attrs70 self.__elts = []71 self.__defs = []72 self.__bounds = (0, 0, 0, 0)73 self._offset = (0, 0)74 self._save.append('_offset')75 self.__clipid = 076 self.__enter__()77 def __bound(self, x, y):78 l, r, t, b = self.__bounds79 x += self._offset[0]; y += self._offset[1]80 self.__bounds = (min(x, l), max(x, r), min(y, t), max(y, b))81 def __fsAttrs(self, fill=None, stroke=None, stroke_width=None):82 ats = ''83 if fill is None:84 ats += ' fill="none"'85 else:86 ats += ' fill="%s"' % self.__rgb2css(fill)87 if len(fill) == 4:88 ats += ' fill-opacity="%g"' % fill[3]89 if stroke is not None:90 ats += ' stroke="%s"' % self.__rgb2css(stroke)91 if len(stroke) == 4:92 ats += ' stroke-opacity="%g"' % stroke[3]93 if stroke_width is not None:94 ats += ' stroke-width="%s"' % stroke_width95 return ats96 def __rgb2css(self, rgb):97 r, g, b = rgb[:3]98 return '#%02x%02x%02x' % (int(r * 255), int(g * 255), int(b * 255))99 def translate(self, x, y):100 self.__elts.append('<g transform="translate(%g,%g)">' % (x, y))101 self._offset = (self._offset[0] + x, self._offset[1] + y)102 self._unwind.append(lambda: self.__elts.append('</g>'))103 def rect(self, x, y, w, h, **kw):104 e = '<rect x="%g" y="%g" width="%g" height="%g"%s />' % \105 (x, y, w, h, self.__fsAttrs(**kw))106 self.__elts.append(e)107 self.__bound(x, y)108 self.__bound(x+w, y+h)109 def circle(self, x, y, r, **kw):110 e = '<circle cx="%g" cy="%g" r="%g"%s />' % \111 (x, y, r, self.__fsAttrs(**kw))112 self.__elts.append(e)113 self.__bound(x + r, y)114 self.__bound(x - r, y)115 self.__bound(x, y + r)116 self.__bound(x, y - r)117 def __mkD(self, points):118 d = []119 lx, ly = 0, 0120 for op in self._canonPath(points):121 if op == 'Z':122 d.append('Z')123 else:124 px, py = op[1], op[2]125 if op[0] == 'L' and px == lx:126 d.append('V%g' % py)127 elif op[0] == 'L' and py == ly:128 d.append('H%g' % px)129 else:130 d.append('%s%g %g' % op)131 lx, ly = px, py132 self.__bound(px, py)133 return ' '.join(d)134 def path(self, points, **kw):135 e = '<path d="%s"%s />' % (self.__mkD(points), self.__fsAttrs(**kw))136 self.__elts.append(e)137 def clip(self, points):138 cid = 'clip%d' % self.__clipid139 self.__clipid += 1140 self.__defs.extend(['<clipPath id="%s">' % cid,141 ' <path d="%s" />' % self.__mkD(points),142 '</clipPath>'])143 self.__elts.append('<g clip-path="url(#%s)">' % cid)144 self._unwind.append(lambda: self.__elts.append('</g>'))145 def text(self, text, x, y, align, rotate=0, **kw):146 # Unfortunately, the various baseline adjustment properties147 # are not well supported, so we fake it by manually adjusting148 # the position relative to the line-height/font-size.149 # baseline = {'t': 'text-before-edge', 'c': 'central',150 # 'b': 'text-after-edge'}[align[0]]151 text_anchor = {'l': 'start', 'm': 'middle', 'r': 'end'}[align[1]]152 e = escape(text).encode('ascii', 'xmlcharrefreplace')153 extra = ''154 if align[0] == 't':155 extra += ' dy="%gpt"' % (self.font_size * .66)156 elif align[0] == 'c':157 extra += ' dy="%gpt"' % (self.font_size * .33)158 elif align[0] == 'b':159 pass160 else:161 raise ValueError('Unknown alignment %r' % align)162 if rotate:163 extra += ' transform="rotate(%g %g,%g)"' % (-rotate, x, y)164 kw.setdefault('fill', (0,0,0))165 e = '<text x="%g" y="%g" text-anchor="%s" font-size="%gpt"%s%s>%s</text>' % \166 (x, y, text_anchor, self.font_size, extra, self.__fsAttrs(**kw), e)167 self.__elts.append(e)168 # Guess bounds (more foolishness)169 w = 1.25 * (len(text) * self.font_size * 0.66)170 h = 1.25 * self.font_size171 wadj = {'l': 0, 'm': -w/2, 'r': -w}[align[1]]172 hadj = {'b': 0, 'c': h/2, 't': h}[align[0]]173 if rotate == 90:174 self.__bound(x + hadj, y + wadj)175 self.__bound(x + hadj - h, y + wadj - w)176 else:177 self.__bound(x + wadj, y + hadj)178 self.__bound(x + wadj + w, y + hadj - h)179 def write_to(self, fp):180 """Write the SVG output to fp."""181 self.__exit__()182 extra = ' '.join('%s=%s' % (k, quoteattr(v))183 for k,v in self.__gattrs.items())184 l, r, t, b = self.__bounds185 print >>fp, '<svg version="1.1" width="%dpx" height="%dpx" viewBox="%g %g %g %g" %s>\n' % \186 (r - l, b - t, l, t, r - l, b - t, extra)187 if self.__defs:188 print >>fp, '<defs>'189 for elt in self.__defs:190 print >>fp, elt191 print >>fp, '</defs>'192 for elt in self.__elts:193 print >>fp, elt194 print >>fp, '</svg>'195class TikZ(_ContextBase):196 def __init__(self, x='%gin' % (1/90.0), y='%gin' % (1/90.0),197 **attrs):198 """Create a new TikZ document.199 x and y specify the size of the x and y units. Their default200 values match the physical size of SVG pixels.201 """202 super(TikZ, self).__init__()203 self.__gattrs = attrs.copy()204 self.__gattrs.update(x=str(x), y='-' + str(y))205 self.__o = []206 self.font_extra = ''207 self._save.append('font_extra')208 self.__enter__()209 def __mkColor(self, attr_name, spec):210 self.o(r'\definecolor{tmp%s}{rgb}{%g,%g,%g}' %211 ((attr_name,) + tuple(spec[:3])))212 attr = '%s=tmp%s' % (attr_name, attr_name)213 if len(spec) == 4:214 attr += ',%s opacity=%g' % (attr_name, spec[3])215 return attr216 def __fsOpts(self, fill=None, stroke=None, stroke_width=None):217 attrs = []218 if fill is not None:219 attrs.append(self.__mkColor('fill', fill))220 if stroke is not None:221 attrs.append(self.__mkColor('draw', stroke))222 if stroke_width is not None:223 attrs.append('line width=%s' % stroke_width)224 return ','.join(attrs)225 def o(self, code):226 self.__o.append(code)227 def translate(self, x, y):228 self.o(r'\begin{scope}[shift={(%g,%g)}]' % (x, y))229 self._unwind.append(lambda: self.o(r'\end{scope}'))230 def rect(self, x, y, w, h, **kw):231 self.o(r'\path[%s] (%g,%g) rectangle +(%g,%g);' %232 (self.__fsOpts(**kw), x, y, w, h))233 def circle(self, x, y, r, **kw):234 self.o(r'\path[%s] (%g,%g) circle (%g);' %235 (self.__fsOpts(**kw), x, y, r))236 def __mkPath(self, points):237 d = []238 for op in self._canonPath(points):239 if op == 'Z':240 d.append('-- cycle')241 else:242 if op[0] == 'L':243 d.append('--')244 d.append('(%g,%g)' % op[1:])245 return ' '.join(d)246 def path(self, points, **kw):247 self.o(r'\path[%s] %s;' % (self.__fsOpts(**kw), self.__mkPath(points)))248 def clip(self, points):249 self.o(r'\begin{scope}')250 self.o(r'\clip %s;' % self.__mkPath(points))251 self._unwind.append(lambda: self.o(r'\end{scope}'))252 def text(self, text, x, y, align, rotate=0, fill=None):253 attrs = [r'font=\fontsize{%g}{%g}\selectfont%s' %254 (self.font_size, self.font_size*1.2, self.font_extra)]255 if align != 'cm':256 attrs.append('inner sep=0')257 attrs.append('anchor=%s%s' %258 ({'t': 'north', 'c': 'mid', 'b': 'south'}[align[0]],259 {'l': ' west', 'm': '', 'r': ' east'}[align[1]]))260 if rotate:261 attrs.append('rotate=%g' % rotate)262 if fill is not None:263 attrs.append(self.__mkColor('text', fill))264 text = text.replace('%', '\\%')265 self.o(r'\path (%g,%g) node[%s] {%s};' % (x, y, ','.join(attrs), text))266 def write_to(self, fp):267 """Write the TikZ output to fp."""268 self.__exit__()269 print >>fp, r'\begin{tikzpicture}[%s]' % (270 ','.join('%s=%s' % kv for kv in self.__gattrs.items()))271 for line in self.__o:272 print >>fp, line...

