1#!/usr/bin/env python2# -*- encoding: utf-8 -*-3import os, sys4if __name__ == '__main__':5 from twisted.internet import glib2reactor as reactor6 reactor.install()7from twisted.internet import defer, reactor8from twisted.internet.protocol import Protocol, ClientFactory9import time, datetime10from gzip import GzipFile11from StringIO import StringIO12import gobject13import gc14#15from util import debug16DEBUG = 217class ClientProtocol(Protocol):18 def __init__(self):19 self._recv_data = ''20 self._header = ''21 self.content_encoding = ''22 self.content_size = 023 def connectionMade(self):24 debug(DEBUG+1, '%s connectionMade with: %s', self, self.transport.getPeer())25 #self.transport.setTcpKeepAlive(1)26 self.factory.connectionMade(self.transport.getPeer().host)27 28 def makeRequest(self, path, byterange=''):29 debug(DEBUG+1, 'makeRequest: %s %s:%d', path,, 30 self.factory.port)31 if self.factory.port == 80:32 host = self.factory.host33 else:34 host = '%s:%d' %(, self.factory.port)35 s = 'GET %s HTTP/1.1\r\n' %(path) +\36 'Host: %s\r\n' %(host) +\37 'Connection: keep-alive\r\n' +\38 'Cache-Control: no-cache\r\n' +\39 'Pragma: no-cache\r\n'40 if not byterange == '':41 s = s+'Range: bytes='+byterange+'\r\n'42 s = s+'User-Agent:Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/28.0.1500.71 Safari/537.36\r\n' +\43 '\r\n'44 #debug(0, 'makeRequest: %s ', s)45 self.transport.write(s)46 def dataReceived(self, data):47 #debug(DEBUG, 'dataReceived: %s', len(data))48 if data.startswith('HTTP/1.1 404'):49 debug(DEBUG, '%s error 404', self)50 self.factory.emit('error', 'HTTP/1.1 404 response')51 self.transport.loseConnection()52 return53 elif data.startswith('HTTP/1.1 200 OK'):54 self._recv_data = data55 else:56 self._recv_data += data57 # handle response58 if not self._recv_data:59 return60 added_size = len(data)61 if not self._header:62 try:63 self._header, self._recv_data = self._recv_data.split('\r\n\r\n', 1)64 added_size = len(self._recv_data)65 for h in self._header.split('\r\n'):66 if h.startswith('Content-Length: '):67 self.content_size = int(h.replace('Content-Length: ', '').strip())68 elif h.startswith('Content-Size: '):69 self.content_size = int(h.replace('Content-Size: ', '').strip())70 elif h.startswith('Content-Encoding: '):71 self.content_encoding = h.replace('Content-Encoding: ', '').strip()72 #elif h.startswith('X-Cache:'):73 # print h.replace('X-Cache: ', '')74 except ValueError:75 return76 #77 if self.content_size == len(self._recv_data):78 if self.content_encoding == 'gzip':79 self._recv_data = GzipFile('', 'rb', 9, StringIO(self._recv_data)).read()80 self.factory.emit('data-received', self._recv_data)81 self._recv_data = ''82 self._header = ''83 self.content_encoding = ''84 self.content_size = 085 elif added_size > 0:86 self.factory.emit('data-receiving', added_size, self.content_size - len(self._recv_data))87 '''def dataReceived(self, data):88 #debug(DEBUG, 'dataReceived: %s', len(data))89 if data.startswith('HTTP/1.1 404'):90 debug(DEBUG, '%s error 404', self)91 self.factory.emit('error', 'HTTP/1.1 404 response')92 self.transport.loseConnection()93 return94 elif data.startswith('HTTP/1.1 200 OK'):95 self._recv_data = data96 elif self._recv_data:97 self._recv_data += data98 self.factory.emit('on-receiving', data)99 # handle response100 if not self._recv_data:101 return102 try:103 header, data = self._recv_data.split('\r\n\r\n', 1)104 except ValueError:105 return106 content_size, content_encoding = 0, ''107 for h in header.split('\r\n'):108 if h.startswith('Content-Length: '):109 content_size = int(h.replace('Content-Length: ', '').strip())110 elif h.startswith('Content-Size: '):111 content_size = int(h.replace('Content-Size: ', '').strip())112 elif h.startswith('Content-Encoding: '):113 content_encoding = h.replace('Content-Encoding: ', '').strip()114 #elif h.startswith('X-Cache:'):115 # print h.replace('X-Cache: ', '')116 if content_size == len(data):117 if content_encoding == 'gzip':118 data = GzipFile('', 'rb', 9, StringIO(data)).read()119 self.factory.emit('data-received', data)120 self._recv_data = '''''121 def connectionLost(self, reason):122 debug(DEBUG+1, '%s connectionLost: %s', self, reason)123 del(self._recv_data)124 self._recv_data = ''125 self._close_socket()126 self.factory.emit('connection-lost')127 def _close_socket(self):128 if not self.transport:129 return130 self.transport.loseConnection()131 try:132 self.transport.getHandle().close()133 except Exception, e:134 pass135 # remove internal buffers136 self.transport._tempDataBuffer[:] = ''137 self.transport.dataBuffer = ''138 self.transport = None139class ClientFactory(ClientFactory, gobject.GObject):140 protocol = ClientProtocol141 noisy = False142 __gsignals__ = {143 'connection-made': (144 gobject.SIGNAL_RUN_LAST,145 gobject.TYPE_NONE, #return146 (147 gobject.TYPE_PYOBJECT, #data148 ) # args149 ),150 'error': (151 gobject.SIGNAL_RUN_LAST,152 gobject.TYPE_NONE, #return153 (str, ) # args154 ),155 'connection-lost': (156 gobject.SIGNAL_RUN_LAST,157 gobject.TYPE_NONE, #return158 () # args159 ),160 'data-received': (161 gobject.SIGNAL_RUN_LAST,162 gobject.TYPE_NONE, #return163 (164 gobject.TYPE_PYOBJECT, #data165 ) # args166 ),167 'data-receiving': (168 gobject.SIGNAL_RUN_LAST,169 gobject.TYPE_NONE, #return170 (171 int, #data diff received172 int, #remaining data size173 ) # args174 ),175 }176 def __init__(self, url):177 gobject.GObject.__init__(self)178 #179 debug(DEBUG+1, '%s init %s', self, url)180 self.url = url181, self.port, self.path = parse_url(url)182 self.client = None183 self.connector = reactor.connectTCP(, self.port, self)184 def connectionMade(self, host):185 self.client = self.connector.transport.protocol186 self.emit('connection-made', host)187 def makeRequest(self, path, byterange=''):188 self.client.makeRequest(path, byterange)189 def stopFactory(self):190 self.stop()191 def stop(self):192 debug(DEBUG+1, '%s stop', self)193, self.port = None, 0194 if self.connector:195 self.connector.disconnect()196 self.connector = None197 self.client = None198 gc.collect()199def parse_url(url):200 _,_, address, path = url.split('/', 3)201 try:202 host, port = address.split(':', 1)203 except ValueError, e:204 host = address205 port = 80206 port = int(port)207 return host, port, '/'+path208if __name__ == '__main__':209 host, port, path = parse_url('')210 def on_connection(c):211 print c212 c.makeRequest(path)213 def on_data(c, data):214 print c, len(data)215 reactor.callLater(1, c.makeRequest, '/test/0ccf2c94c951880cd2456f4fb2db2b9d/4_00000.ts')216 c = ClientFactory(host, port)217 c.connect('connection-made', on_connection)218 c.connect('data-received', on_data)...

1# -*- Mode: Python; tab-width: 4 -*-2# Copyright (c) 2005-2010 Slide, Inc.3# All rights reserved.4#5# Redistribution and use in source and binary forms, with or without6# modification, are permitted provided that the following conditions are7# met:8#9# * Redistributions of source code must retain the above copyright10# notice, this list of conditions and the following disclaimer.11# * Redistributions in binary form must reproduce the above12# copyright notice, this list of conditions and the following13# disclaimer in the documentation and/or other materials provided14# with the distribution.15# * Neither the name of the author nor the names of other16# contributors may be used to endorse or promote products derived17# from this software without specific prior written permission.18#19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS20# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT21# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR22# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT23# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,24# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT25# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,26# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY27# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT28# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.30"""31Wire protocol to read/write entire commands32"""33import socket34import select35import struct36import errno37import message38COMMAND_READ_SIZE = 32*102439class ConnectionError(RuntimeError):40 pass41class ReadWriter(object):42 def __init__(self, connection, **kwargs):43 self.connection = connection44 self._recv_data = ''45 self._recv_size = 046 self._send_data = ''47 self.serializer = message.BinSerializer(**kwargs)48 def send_size(self):49 return len(self._send_data)50 def recv_size(self):51 return len(self._recv_data)52 def read_command(self):53 while not self._recv_size or self._recv_size > len(self._recv_data):54 try:55 partial = self.connection.recv(COMMAND_READ_SIZE)56 except socket.error, e:57 if e[0] == errno.EINTR:58 del(e)59 continue60 partial = ''61 if not partial:62 raise ConnectionError(*locals().get('e', (0, 'empty recv')))63 64 self._recv_data += partial65 if not self._recv_size and not (len(self._recv_data) < 4):66 self._recv_size = struct.unpack('!i', self._recv_data[:4])[0]67 self._recv_data = self._recv_data[4:]68 result = self.serializer.deserialize(self._recv_data[:self._recv_size])69 self._recv_data = self._recv_data[self._recv_size:]70 if len(self._recv_data) < 4:71 self._recv_size = 072 else:73 self._recv_size = struct.unpack('!i', self._recv_data[:4])[0]74 self._recv_data = self._recv_data[4:]75 return result76 def write(self):77 while self._send_data:78 try:79 size = self.connection.send(self._send_data)80 except socket.error, e:81 if e[0] == errno.EINTR:82 continue83 else:84 raise ConnectionError(*e)85 self._send_data = self._send_data[size:]86 def write_int(self, i):87 i = int(i)88 return '%s%s%s%s' % (89 chr((i & 0xff000000) >> 24),90 chr((i & 0x00ff0000) >> 16),91 chr((i & 0x0000ff00) >> 8),92 chr((i & 0x000000ff)))93 def queue_command(self, command):94 response = self.serializer.serialize(command)95 self._send_data += self.write_int(len(response)) + response96 def write_command(self, command):97 self.queue_command(command)98 self.write()99 def flush(self):100 self.write()101 def close(self):102 self.connection.close()103 def wake(self):104 # only support interrupted read, if we have it.105 if hasattr(self.connection, 'wake'):106 self.connection.wake(select.POLLIN)107 def settimeout(self, timeout):108 self.connection.settimeout(timeout)109 def gettimeout(self):110 return self.connection.gettimeout()111 def setsockopt(self, level, option, value):112 return self.connection.setsockopt(level, option, value)113 def getsockopt(self, level, option):114 return self.connection.getsockopt(level, option)115 def fileno(self):116 return self.connection.fileno()117#...

