vendor/tramline

changeset 52:811fdddf96dc xsendfile

POC for mod_xsendfile integration. Must check PUT etc
author Georges Racinet on purity.racinet.fr <georges@racinet.fr>
date Sun, 30 Sep 2012 22:02:17 +0200
parents 9eb5c502df01
children 692f926b7530
files src/tramline/core.py
diffstat 1 files changed, 47 insertions(+), 238 deletions(-) [+]
line diff
     1.1 --- a/src/tramline/core.py
     1.2 +++ b/src/tramline/core.py
     1.3 @@ -1,6 +1,6 @@
     1.4  import os, tempfile, random, sys, errno, mimetools
     1.5  import cgi
     1.6 -from mod_python import apache, Cookie
     1.7 +from mod_python import apache
     1.8  
     1.9  OPTION_ALLOW_GROUP_WRITE = 'allow_group_write'
    1.10  TRAMLINE_RANGE_HEADER = 'X-Tramline-Original-Range'
    1.11 @@ -48,9 +48,7 @@
    1.12                             tramline_repository_path(req))):
    1.13          if not os.path.isdir(p):
    1.14              os.mkdir(p)
    1.15 -            import sys
    1.16 -            sys.stderr.write('creating path: %r\n' % p)
    1.17 -	    if i == 3 and group_write(req):
    1.18 +            if i == 3 and group_write(req):
    1.19                  # drwxrwsr-x
    1.20                  # (set-group-ID-on-execution bit)
    1.21                  os.chmod(p, 02775)
    1.22 @@ -87,36 +85,16 @@
    1.23          pass_on(filter)
    1.24          return
    1.25  
    1.26 -    req_method = filter.req.method
    1.27 -    if req_method == 'GET':
    1.28 -        hin = filter.req.headers_in
    1.29 -        range = hin.get('Range')
    1.30 -        if range is not None:
    1.31 -            hin[TRAMLINE_RANGE_HEADER] = range
    1.32 -            # hoping that most backing apps issue 206, not 200 for that
    1.33 -            hin['Range'] = 'bytes=0-' 
    1.34 -
    1.35 -    
    1.36 -    # we only affect POST and PUT requests from now on
    1.37 -    if filter.req.method not in ('POST', 'PUT'):
    1.38 +    # we only handle POST requests
    1.39 +    if filter.req.method != 'POST':
    1.40          pass_on(filter)
    1.41          return
    1.42  
    1.43 -    if req_method == 'POST':
    1.44 -        # we only handle multipart/form-data
    1.45 -        enctype = filter.req.headers_in.get('Content-Type')
    1.46 -        if enctype[:19] != 'multipart/form-data':
    1.47 -            pass_on(filter)
    1.48 -            return
    1.49 -    elif req_method == 'PUT':
    1.50 -        # we only handle request that have the 'X-Tramline-Enable' header
    1.51 -        # or 'tramline_enable' cookie. 
    1.52 -        # Cookie enabling is here to support those agents that
    1.53 -        # can't set a custom header (e.g., zopeedit)
    1.54 -        if filter.req.headers_in.get('X-Tramline-Enable') is None \
    1.55 -                and Cookie.get_cookie(filter.req, 'tramline_enable') is None:
    1.56 -            pass_on(filter)
    1.57 -            return
    1.58 +    # we only handle multipart/form-data
    1.59 +    enctype = filter.req.headers_in.get('Content-Type')
    1.60 +    if enctype[:19] != 'multipart/form-data':
    1.61 +        pass_on(filter)
    1.62 +        return
    1.63  
    1.64      # check whether we have an id already
    1.65      id = filter.req.headers_in.get('tramline_id')
    1.66 @@ -124,7 +102,7 @@
    1.67      if id is None:
    1.68          # no id, so create new processor instance and store
    1.69          # away id
    1.70 -        processor = theProcessorRegistry.createProcessor(req_method=req_method)
    1.71 +        processor = theProcessorRegistry.createProcessor()
    1.72          processor.initFromInputFilter(filter)
    1.73          filter.req.headers_in['tramline_id'] = str(processor.id)
    1.74      else:
    1.75 @@ -160,20 +138,15 @@
    1.76  def outputfilter(filter):
    1.77      # we're done if we're in a subrequset
    1.78      if filter.req.main is not None:
    1.79 +        log('subrequest', filter.rec) 
    1.80          pass_on(filter)
    1.81          return
    1.82  
    1.83      # in case of post request, we may need to do a commit/abort
    1.84      # of previous input round
    1.85 -    if filter.req.method in ('POST', 'PUT'):
    1.86 +    if filter.req.method == 'POST':
    1.87          outputfilter_post(filter)
    1.88          return
    1.89 -    # in case of a get request, we may need to serve up files,
    1.90 -    # depending on what's in the headers
    1.91 -    elif filter.req.method == 'GET':
    1.92 -        outputfilter_get(filter)
    1.93 -        return
    1.94 -    
    1.95      pass_on(filter)
    1.96      
    1.97  def outputfilter_post(filter):
    1.98 @@ -203,157 +176,25 @@
    1.99      del filter.req.headers_in['tramline_id']
   1.100  
   1.101      # now pass along the data.
   1.102 -    pass_on(filter)
   1.103 -
   1.104 -def outputfilter_get(filter):
   1.105 -    # check whether we want to do file serving using tramline
   1.106 -    if not filter.req.headers_out.has_key('tramline_file'):
   1.107 -        pass_on(filter)
   1.108 -        return
   1.109 -
   1.110 -    status = filter.req.status
   1.111 -
   1.112 -    # now read file id
   1.113 -    data = []
   1.114 -    s = filter.read()
   1.115 -    while s:
   1.116 -        data.append(s)
   1.117 -        s = filter.read()
   1.118 -    data = ''.join(data)
   1.119 -
   1.120 -    # if multiple lines, the id is the last, first lines have to be forwarded
   1.121 -    split = data.rsplit('\n', 1)
   1.122 -    if len(split) == 1:
   1.123 -        file_id = data
   1.124 -        prepend = ''
   1.125 -    else:
   1.126 -        file_id = split[1]
   1.127 -        prepend = split[0] + '\n'
   1.128 -
   1.129 -    p = id_to_path(tramline_path(filter.req), file_id)
   1.130 -
   1.131 -    if not file_id:
   1.132 -        return
   1.133 -    # Range request
   1.134 -    hin = filter.req.headers_in
   1.135 -    h = hin.get(TRAMLINE_RANGE_HEADER)
   1.136 -
   1.137 -    if h and status in [206, 416]:
   1.138 -        # Range was requested and app server agreed
   1.139 -	ranges = parse_range(h)
   1.140 -	if ranges: 
   1.141 -	   serve_ranges(p, ranges[1], filter)
   1.142 -	else:
   1.143 -           log("Unparsable Range header " + h, filter.req)
   1.144 -    elif status == 200:
   1.145 -	serve_file(p, filter, prepend=prepend)
   1.146 -
   1.147 -def serve_file(p, filter, prepend=''):
   1.148 -    """Serve a whole file."""
   1.149 -    # XXX what if file doesn't exist? 404?
   1.150 -    size = os.stat(p).st_size
   1.151 -    if prepend:
   1.152 -        filter.write(prepend)
   1.153 -    filter.req.headers_out['content-length'] = str(size + len(prepend))
   1.154 -    f = open(p, 'rb')
   1.155 -    dump_file_range(f, 0, size-1, filter)
   1.156 -    f.close()
   1.157 -    filter.close()
   1.158 -
   1.159 -def serve_ranges(path, ranges, filter, boundary=None):
   1.160 -    size = os.stat(path).st_size
   1.161 -
   1.162 -    # TODO support suffix notation RFC 2616 p 138
   1.163 -    # do a subroutine for that
   1.164 -
   1.165 -    expanded = [start is None and (size-end, size-1) or 
   1.166 -                (start, end is None and size-1 or end)
   1.167 -                for start, end in ranges]
   1.168 -
   1.169 -    hout = filter.req.headers_out
   1.170 -    fd = open(path, 'rb')
   1.171 -    if len(ranges) == 1:
   1.172 -	start, end = expanded[0]
   1.173 -	if end < start:
   1.174 -             hout['Content-Range'] = 'bytes */%d' % size
   1.175 -             hout['Content-Length'] = str(size)
   1.176 -             filter.req.status = 416
   1.177 -             filter.close()
   1.178 -             return
   1.179 -
   1.180 -  	hout['Content-Length'] = str(end - start + 1)
   1.181 -        hout['Content-Range'] = 'bytes %d-%d/%d' % (start, end, size)
   1.182 -	dump_file_range(fd, start, end, filter)
   1.183 -        filter.close()
   1.184 -        return	
   1.185 -
   1.186 -    del hout['Content-Range'] # If present, was from app server and can't be ok
   1.187 -    if boundary is None:
   1.188 -        boundary = mimetools.choose_boundary()
   1.189 -
   1.190 -    content_type = hout['Content-Type']
   1.191 -    # This takes precedence over hout
   1.192 -    filter.req.content_type = 'multipart/byteranges; boundary=%s' % boundary;
   1.193 -
   1.194 -    length = (8 + len(boundary) + # End marker length             
   1.195 -       len(ranges) * (      # Constant lenght per set       
   1.196 -       49 + len(boundary) + len(content_type) + len('%d' % size)))
   1.197 -    for start, end in expanded:
   1.198 -       # Variable length per set                               
   1.199 -       length += len('%d%d' % (start, end)) + 1 + end - start 
   1.200 -    hout['Content-Length'] = str(length)
   1.201 -
   1.202 -    for start, end in expanded:
   1.203 -        filter.write('\r\n--%s\r\n' % boundary)
   1.204 -        filter.write('Content-Type: %s\r\n' % content_type)
   1.205 -        filter.write('Content-Range: bytes %d-%d/%d\r\n\r\n' % (start, end, size))
   1.206 -        dump_file_range(fd, start, end, filter, chunksize=FILE_CHUNKSIZE) # TODO tweak chunksize
   1.207 -    filter.write('\r\n--%s--\r\n' % boundary)
   1.208 -    filter.close()
   1.209 -    fd.close()
   1.210 -
   1.211 -def dump_file_range(fd, start, end, filter, chunksize=FILE_CHUNKSIZE):
   1.212 -    """Perform raw dump of a single file range in filter.
   1.213 -
   1.214 -    Takes chunk size into account."""
   1.215 -    fd.seek(start)
   1.216 -    while end is None or fd.tell() < end+1:
   1.217 -        if end is None:
   1.218 -            data = fd.read(chunksize)
   1.219 -        else:
   1.220 -            data = fd.read(min(chunksize, end+1-fd.tell()))
   1.221 -        if not data:
   1.222 -            break
   1.223 -        filter.write(data)
   1.224 -        # flush the data out as soon as possible, so we don't
   1.225 -        # waste memory
   1.226 -        try:
   1.227 -	    filter.flush()
   1.228 -	except IOError:
   1.229 -	    # Happens if user cancels download
   1.230 -	    return
   1.231 +    #pass_on(filter)
   1.232 +    filter.pass_on()
   1.233  
   1.234      
   1.235  class ProcessorRegistry:
   1.236 -
   1.237 -    _processor_classes = {}
   1.238 -
   1.239      def __init__(self):
   1.240          self._processors = {}
   1.241  
   1.242      def getProcessor(self, id):
   1.243          return self._processors[id]
   1.244  
   1.245 -    def createProcessor(self, req_method='POST'):
   1.246 +    def createProcessor(self):
   1.247          # XXX thread issues?
   1.248          while True:
   1.249              id = random.randrange(sys.maxint)
   1.250              if id not in self._processors:
   1.251                  break
   1.252 -
   1.253 -        ProcessorClass = self._processor_classes.get(req_method)
   1.254 -        proc = self._processors[id] = ProcessorClass(id)
   1.255 -        return proc
   1.256 +        result = self._processors[id] = Processor(id)
   1.257 +        return result
   1.258      
   1.259      def removeProcessor(self, processor):
   1.260          del self._processors[processor.id]
   1.261 @@ -364,7 +205,7 @@
   1.262      return os.path.join(tramline_stat_path(req), progress_id)
   1.263  
   1.264  class Processor:
   1.265 -    """Base class for processors.
   1.266 +    """Processor for Post requests. 
   1.267  
   1.268      API atributes:
   1.269         id: this is the main identifier in the registry. It is used for 
   1.270 @@ -380,9 +221,10 @@
   1.271         uploaded: this is the total file data upload this processor has seen. 
   1.272             there might be more than one file in this request.
   1.273  
   1.274 -       upload_length: the full length of the input request, as taken from the
   1.275 +       upload_length: the full length of the POST request, as taken from the
   1.276         header
   1.277      """
   1.278 +       
   1.279      def __init__(self, id):
   1.280          self.uploaded = 0L
   1.281          self.id = id
   1.282 @@ -391,7 +233,11 @@
   1.283          self._upload_files = []
   1.284          self._incoming = []
   1.285  	self._isize = 0
   1.286 -        self._f = None # output file object
   1.287 +        # we use a state pattern where the handle method gets
   1.288 +        # replaced by the current handle method for this state.
   1.289 +        self.handle = self.handle_first_boundary
   1.290 +        self.vars_to_handle = []
   1.291 +        self._enable_vars=''
   1.292  
   1.293      def initFromInputFilter(self, filter):
   1.294          headers = filter.req.headers_in
   1.295 @@ -403,6 +249,11 @@
   1.296                # parse_qs always produces lists
   1.297                self.progress_id = filter.req.connection.remote_ip + '-' + gu_ids[0]
   1.298  
   1.299 +    def pushInput(self, data, out):
   1.300 +        lines = data.splitlines(True)
   1.301 +        for line in lines:
   1.302 +            self.pushInputLine(line, out)
   1.303 +
   1.304      def logProgress(self, req):
   1.305          """Log progress upload to a file for async requests to use
   1.306  
   1.307 @@ -423,66 +274,12 @@
   1.308              f.write(str({'state': 1, 'percent': int(percent)}))
   1.309              f.close()
   1.310  
   1.311 -    def commit(self, req):
   1.312 -        # XXX works under the assumption that the last segment of 
   1.313 -        # file path is the tramline id
   1.314 -        for upload_file in self._upload_files:
   1.315 -            dummy, filename = os.path.split(upload_file)
   1.316 -            os.rename(upload_file, id_to_path(tramline_path(req), filename))
   1.317 -
   1.318 -    def abort(self):
   1.319 -        for upload_file in self._upload_files:
   1.320 -            os.remove(upload_file)
   1.321 -
   1.322 -    def initUploadFile(self, out, with_newline=True):
   1.323 -        """Create the dump file, write id to out and keep needed references."""
   1.324 -        fd, pathname, file_id = createUniqueFile(out.req)
   1.325 -
   1.326 -        self._f = os.fdopen(fd, 'wb')
   1.327 -        self._upload_files.append(pathname)
   1.328 -        out.write(file_id)
   1.329 -        if with_newline:
   1.330 -            out.write('\r\n')
   1.331 -
   1.332 -class PutRequestProcessor(Processor):
   1.333 -   """Directly dump inconditionaly incoming data."""
   1.334 -
   1.335 -   def pushInput(self, data, out):
   1.336 -       if self._f is None:
   1.337 -           self.initUploadFile(out, with_newline=False)
   1.338 -       self._f.write(data)
   1.339 -
   1.340 -   def finalizeInput(self, out):
   1.341 -       out.req.headers_in['tramline'] = ''
   1.342 -       if self._f is not None: # one never knows
   1.343 -           self._f.close()
   1.344 -
   1.345 -class PostRequestProcessor(Processor):
   1.346 -    """Processor for Post requests. 
   1.347 -
   1.348 -    Handles multipart/form-data content, looks for enabling info in the form
   1.349 -    data, and if enabled, intercept those parts that are file uploads.
   1.350 -    """
   1.351 -
   1.352 -    def __init__(self, pid):
   1.353 -        Processor.__init__(self, pid)
   1.354 -        # we use a state pattern where the handle method gets
   1.355 -        # replaced by the current handle method for this state.
   1.356 -        self.handle = self.handle_first_boundary
   1.357 -        self.vars_to_handle = []
   1.358 -        self._enable_vars=''
   1.359 -
   1.360 -    def pushInput(self, data, out):
   1.361 -        lines = data.splitlines(True)
   1.362 -        for line in lines:
   1.363 -            self.pushInputLine(line, out)
   1.364 -
   1.365      def pushInputLine(self, data, out):
   1.366          # collect data
   1.367          self._incoming.append(data)
   1.368  	self._isize += len(data)
   1.369  
   1.370 -       # if we're not at the end of the line, input was broken
   1.371 +        # if we're not at the end of the line, input was broken
   1.372          # somewhere, unless we are handling file data which might be binary.
   1.373  	# We return to collect more first (also for file data if too small)
   1.374          if data[-1] != '\n' and (
   1.375 @@ -504,6 +301,17 @@
   1.376          if self._upload_files:
   1.377              out.req.headers_in['tramline'] = ''
   1.378  
   1.379 +    def commit(self, req):
   1.380 +        # XXX works under the assumption that the last segment of 
   1.381 +        # file path is the tramline id
   1.382 +        for upload_file in self._upload_files:
   1.383 +            dummy, filename = os.path.split(upload_file)
   1.384 +            os.rename(upload_file, id_to_path(tramline_path(req), filename))
   1.385 +
   1.386 +    def abort(self):
   1.387 +        for upload_file in self._upload_files:
   1.388 +            os.remove(upload_file)
   1.389 +    
   1.390      def handle_first_boundary(self, line, out):
   1.391          self._boundary = line
   1.392          self._last_boundary = self._boundary.rstrip() + '--\r\n'
   1.393 @@ -549,8 +357,12 @@
   1.394                self._disposition_options.get('name') not in self.vars_to_handle:
   1.395              self.handle = self.handle_data
   1.396              return
   1.397 +        fd, pathname, file_id = createUniqueFile(out.req)
   1.398  
   1.399 -        self.initUploadFile(out)
   1.400 +        self._f = os.fdopen(fd, 'wb')
   1.401 +        self._upload_files.append(pathname)
   1.402 +        out.write(file_id)
   1.403 +        out.write('\r\n')
   1.404          
   1.405          self._previous_line = None
   1.406          self.handle = self.handle_file_data
   1.407 @@ -671,9 +483,6 @@
   1.408                  continue # try again
   1.409              raise
   1.410  
   1.411 -ProcessorRegistry._processor_classes = dict(PUT=PutRequestProcessor,
   1.412 -                                            POST=PostRequestProcessor)
   1.413 -
   1.414  def log(data, req):
   1.415      f = open(os.path.join(tramline_path(req), 'tramline.log'), 'ab')
   1.416      f.write(data)