vendor/tramline

changeset 19:23d25b2f5d34 gracinet-fix-range

Merging the progress-bar branch at last
author Georges Racinet on Ishtar.racinet.fr <georges@racinet.fr>
date Thu, 19 Aug 2010 18:56:38 +0200
parents 1954af3381ce 25bd7198a978
children 5b9e1e843689
files
diffstat 4 files changed, 172 insertions(+), 5 deletions(-) [+]
line diff
     1.1 --- a/src/tramline/core.py
     1.2 +++ b/src/tramline/core.py
     1.3 @@ -1,4 +1,6 @@
     1.4  import os, tempfile, random, sys, errno, mimetools
     1.5 +import cgi
     1.6 +from mod_python import apache
     1.7  
     1.8  OPTION_ALLOW_GROUP_WRITE = 'allow_group_write'
     1.9  TRAMLINE_RANGE_HEADER = 'X-Tramline-Original-Range'
    1.10 @@ -30,6 +32,9 @@
    1.11  def tramline_repository_path(req):
    1.12      return os.path.join(tramline_path(req), 'repository')
    1.13  
    1.14 +def tramline_stat_path(req):
    1.15 +    return os.path.join(tramline_path(req), 'stat')
    1.16 +
    1.17  def group_write(req): 
    1.18      op = req.get_options().get(OPTION_ALLOW_GROUP_WRITE) 
    1.19      return sys.platform != 'win32' \
    1.20 @@ -39,6 +44,7 @@
    1.21      base = tramline_path(req)
    1.22      for i, p in enumerate((tramline_path(req), 
    1.23                             tramline_upload_path(req), 
    1.24 +                           tramline_stat_path(req), 
    1.25                             tramline_repository_path(req))):
    1.26          if not os.path.isdir(p):
    1.27              os.mkdir(p)
    1.28 @@ -48,7 +54,7 @@
    1.29                  os.chmod(p, 02775)
    1.30  
    1.31  FILE_CHUNKSIZE = 8 * 1024
    1.32 -
    1.33 +REPORT_BLOCKSIZE = 100 * 1024
    1.34  """
    1.35  inputfilter() and outputfilter() are what is called by Apache.
    1.36  
    1.37 @@ -105,6 +111,7 @@
    1.38          # no id, so create new processor instance and store
    1.39          # away id
    1.40          processor = theProcessorRegistry.createProcessor()
    1.41 +        processor.initFromInputFilter(filter)
    1.42          filter.req.headers_in['tramline_id'] = str(processor.id)
    1.43      else:
    1.44          # reuse existing processor instance based on id
    1.45 @@ -321,15 +328,41 @@
    1.46                  break
    1.47          result = self._processors[id] = Processor(id)
    1.48          return result
    1.49 -
    1.50 +    
    1.51      def removeProcessor(self, processor):
    1.52          del self._processors[processor.id]
    1.53  
    1.54  theProcessorRegistry = ProcessorRegistry()
    1.55  
    1.56 +def get_progress_path(req, progress_id):
    1.57 +    return os.path.join(tramline_stat_path(req), progress_id)
    1.58 +
    1.59  class Processor:
    1.60 +    """Processor for Post requests. 
    1.61 +
    1.62 +    API atributes:
    1.63 +       id: this is the main identifier in the registry. It is used for 
    1.64 +           continuity from one call to the other and from input to ouput 
    1.65 +           filters (final commit).
    1.66 +
    1.67 +       progress_id: this is the identifier user agents will use to request 
    1.68 +           stats on the current transfer upload. Unfortunately it has to be
    1.69 +           partially set by an <input> from the user agent, because we have no
    1.70 +           means to communicate it back once the process has started. 
    1.71 +           Still some effort is made to avoid malicious garbling of ids.
    1.72 +
    1.73 +       uploaded: this is the total file data upload this processor has seen. 
    1.74 +           there might be more than one file in this request.
    1.75 +
    1.76 +       upload_length: the full length of the POST request, as taken from the
    1.77 +       header
    1.78 +    """
    1.79 +       
    1.80      def __init__(self, id):
    1.81 +        self.uploaded = 0L
    1.82          self.id = id
    1.83 +        self.progress_id = None
    1.84 +        self._uploaded_blocks = 0 
    1.85          self._upload_files = []
    1.86          self._incoming = []
    1.87  	self._isize = 0
    1.88 @@ -339,11 +372,41 @@
    1.89          self.vars_to_handle = []
    1.90          self._enable_vars=''
    1.91  
    1.92 +    def initFromInputFilter(self, filter):
    1.93 +        headers = filter.req.headers_in
    1.94 +        self.upload_length = long(headers['Content-Length'])
    1.95 +	qs = filter.req.parsed_uri[apache.URI_QUERY]
    1.96 +	if qs is not None:
    1.97 +           gu_ids = cgi.parse_qs(qs).get('gp.fileupload.id')
    1.98 +           if gu_ids is not None:
    1.99 +              # parse_qs always produces lists
   1.100 +              self.progress_id = filter.req.connection.remote_ip + '-' + gu_ids[0]
   1.101 +
   1.102      def pushInput(self, data, out):
   1.103          lines = data.splitlines(True)
   1.104          for line in lines:
   1.105              self.pushInputLine(line, out)
   1.106  
   1.107 +    def logProgress(self, req):
   1.108 +        """Log progress upload to a file for async requests to use
   1.109 +
   1.110 +        Report only every REPORT_BLOCKSIZE bytes to avoid too much I/O
   1.111 +        the file is located in the stat directory, and its name is progress_id.
   1.112 +        Race condition is in theory possible, but very unlikely, and not
   1.113 +        critical to avoid.
   1.114 +        """
   1.115 +        blocks = self.uploaded / REPORT_BLOCKSIZE
   1.116 +        if blocks > self._uploaded_blocks:
   1.117 +            self._uploaded_blocks = blocks
   1.118 +            self._storeProgress(req)
   1.119 +
   1.120 +    def _storeProgress(self, req):
   1.121 +        if self.progress_id is not None:
   1.122 +            f = open(get_progress_path(req, self.progress_id), 'w')
   1.123 +            percent = int(self.uploaded*100 / self.upload_length)
   1.124 +            f.write(str({'state': 1, 'percent': int(percent)}))
   1.125 +            f.close()
   1.126 +
   1.127      def pushInputLine(self, data, out):
   1.128          # collect data
   1.129          self._incoming.append(data)
   1.130 @@ -365,6 +428,7 @@
   1.131          self._isize = 0
   1.132  
   1.133          self.handle(line, out)
   1.134 +        self.logProgress(out.req)
   1.135  
   1.136      def finalizeInput(self, out):
   1.137          if self._upload_files:
   1.138 @@ -412,10 +476,15 @@
   1.139          filename = self._disposition_options.get('filename')
   1.140          # if filename is empty, assume no file is submitted and submit
   1.141          # empty file -- don't tramline this special case
   1.142 -        if out.req.get_options().get('explicit_enable') and \
   1.143 -              self._disposition_options.get('name')=='tramline_enable':
   1.144 +        input_name = self._disposition_options.get('name')
   1.145 +        if input_name == 'tramline_enable' and out.req.get_options().get(
   1.146 +            'explicit_enable'):
   1.147              self.handle = self.handle_enable_vars
   1.148              return
   1.149 +        elif input_name == 'tramline_progress_id':
   1.150 +            # identifier used by async user agent requests to get upload stats
   1.151 +            self.handle = self.handle_progress_id
   1.152 +            return
   1.153          elif (filename is None or not filename) or \
   1.154                out.req.get_options().get('explicit_enable') and \
   1.155                self._disposition_options.get('name') not in self.vars_to_handle:
   1.156 @@ -446,6 +515,22 @@
   1.157          else:
   1.158              self._enable_vars+=line
   1.159  
   1.160 +    def handle_progress_id(self, line, out):
   1.161 +        out.write(line)
   1.162 +        if line == self._boundary:
   1.163 +            self.init_headers()
   1.164 +            self.handle = self.handle_headers
   1.165 +        elif line == self._last_boundary:
   1.166 +            # shouldn't happen if client has some consistency 
   1.167 +            self.handle = None # Processing done
   1.168 +        else:
   1.169 +            # GR: full socket info would be preferable, but it'll be different
   1.170 +            # for stat requests, and not sure the user agent can know and store
   1.171 +            # the full socket info of the upload for requesting.
   1.172 +            self.progress_id = out.req.connection.remote_ip + '-' + line.strip()
   1.173 +            # initialisation of the store
   1.174 +            self._storeProgress(out.req)
   1.175 +
   1.176      def handle_data(self, line, out):
   1.177          out.write(line)
   1.178          if line == self._boundary:
   1.179 @@ -459,6 +544,7 @@
   1.180          if line == self._boundary:
   1.181              # write last line, but without \r\n
   1.182              self._f.write(self._previous_line[:-2])
   1.183 +            self.uploaded += len(self._previous_line[:-2])
   1.184              out.write(line)
   1.185              self._f.close()
   1.186              self._f = None
   1.187 @@ -466,6 +552,7 @@
   1.188          elif line == self._last_boundary:
   1.189              # write last line, but without \r\n
   1.190              self._f.write(self._previous_line[:-2])
   1.191 +            self.uploaded += len(self._previous_line[:-2]) # for completeness
   1.192              out.write(line)
   1.193              self._f.close()
   1.194              self._f = None
   1.195 @@ -473,8 +560,18 @@
   1.196          else:
   1.197              if self._previous_line is not None:
   1.198                  self._f.write(self._previous_line)
   1.199 +                self.uploaded += len(self._previous_line)
   1.200              self._previous_line = line
   1.201  
   1.202 +def get_progress(req, progress_id):
   1.203 +    try:
   1.204 +        f = open(get_progress_path(req, progress_id), 'r')
   1.205 +    except IOError: # inaccessible progress log: process not really started
   1.206 +        return str({'state': 0, 'percent': 0})
   1.207 +    s = f.read()
   1.208 +    f.close() # let's be explicit :-)
   1.209 +    return s
   1.210 +
   1.211  def parse_header(s):
   1.212      l = [e.strip() for e in s.split(';')]
   1.213      result_value = l.pop(0).lower()
     2.1 new file mode 100644
     2.2 --- /dev/null
     2.3 +++ b/src/tramline/progress.py
     2.4 @@ -0,0 +1,12 @@
     2.5 +from mod_python import apache
     2.6 +from tramline.core import get_progress
     2.7 +
     2.8 +def handler(req):
     2.9 +    progress_id = req.connection.remote_ip + '-' + req.uri.rsplit('/', 1)[-1]
    2.10 +    progress = get_progress(req, progress_id)
    2.11 +
    2.12 +    req.content_type = "text/plain"
    2.13 +    req.headers_out['Content-length'] = str(len(progress))
    2.14 +    req.write(progress)
    2.15 +
    2.16 +    return apache.OK
     3.1 new file mode 100644
     3.2 --- /dev/null
     3.3 +++ b/src/tramline/tests/data/input8.txt
     3.4 @@ -0,0 +1,16 @@
     3.5 +-----------------------------100323068321119442571506749230
     3.6 +Content-Disposition: form-data; name="tramline_progress_id"
     3.7 +
     3.8 +1234
     3.9 +-----------------------------100323068321119442571506749230
    3.10 +Content-Disposition: form-data; filename="test.txt"; name="test"
    3.11 +Content-Type: application/octet-stream
    3.12 +
    3.13 +first line
    3.14 +second line
    3.15 +
    3.16 +-----------------------------100323068321119442571506749230
    3.17 +Content-Disposition: form-data; name="submit"
    3.18 +
    3.19 +submit data
    3.20 +-----------------------------100323068321119442571506749230--
     4.1 --- a/src/tramline/tests/test_core.py
     4.2 +++ b/src/tramline/tests/test_core.py
     4.3 @@ -9,6 +9,11 @@
     4.4  from tramline.core import OPTION_ALLOW_GROUP_WRITE
     4.5  from tramline.core import TRAMLINE_RANGE_HEADER
     4.6  
     4.7 +from tramline.core import theProcessorRegistry
     4.8 +
     4.9 +from tramline import core as tramcore
    4.10 +tramcore.REPORT_BLOCKSIZE = 5
    4.11 +
    4.12  tramline_path = '/tmp/trampath'
    4.13  
    4.14  class StringTable(dict):
    4.15 @@ -17,13 +22,18 @@
    4.16              raise ValueError("Table values must be strings")
    4.17          dict.__setitem__(self, key, value)
    4.18  
    4.19 +class Connection:
    4.20 +    remote_ip = 'TESTIP'
    4.21 +
    4.22  class Request:
    4.23      def __init__(self, method):
    4.24 -        self.headers_in = StringTable({'Content-Type' : 'multipart/form-data'})
    4.25 +        self.headers_in = StringTable({'Content-Type' : 'multipart/form-data',
    4.26 +                                       'Content-Length': '56'})
    4.27          self.headers_out = StringTable()
    4.28          self.main = None
    4.29          self.method = method
    4.30          self.options = {'tramline_path': tramline_path}
    4.31 +        self.connection = Connection()
    4.32  
    4.33      def get_options(self):
    4.34          return self.options
    4.35 @@ -136,6 +146,38 @@
    4.36          self.assertEquals(
    4.37              'first line\nsecond line', data)
    4.38          
    4.39 +    def test_inputfilter_progress_id(self):
    4.40 +        input = open(get_data_path('input8.txt'), 'rb')
    4.41 +        progress_id = 'TESTIP-1234'
    4.42 +        output = StringIO()
    4.43 +        filter = Filter(input, output)
    4.44 +        
    4.45 +        inputfilter(filter)
    4.46 +
    4.47 +        input.close()
    4.48 +
    4.49 +        from tramline.core import get_progress
    4.50 +        progress = get_progress(filter.req, progress_id)
    4.51 +        self.assertFalse(progress is None)
    4.52 +
    4.53 +        output_data = output.getvalue()
    4.54 +
    4.55 +        file_id = self.file_id(output_data)
    4.56 +        f = open(os.path.join(tramline_upload_path(filter.req), file_id), 'rb')
    4.57 +        
    4.58 +        data = f.read()
    4.59 +        f.close()
    4.60 +        expected = 'first line\nsecond line\n'
    4.61 +        self.assertEquals(23, len(expected)) # update this if needed
    4.62 +        self.assertEquals(expected, data)
    4.63 +        # process has finished, so uploaded should be the entire length
    4.64 +        # but the fake request is suppose to be 56 bytes long
    4.65 +        self.assertEquals("{'state': 1, 'percent': 41}", progress)
    4.66 +
    4.67 +        # Now requesting progress info with an unknown ID
    4.68 +        self.assertEquals("{'state': 0, 'percent': 0}",
    4.69 +                          get_progress(filter.req, 'TESTIP-4567'))
    4.70 +        
    4.71      def test_split_filter(self):
    4.72          f = open(get_data_path('input2.txt'), 'rb')
    4.73          data = f.read()