vendor/tramline

changeset 8:b97646337e10 gracinet-progress-bar

Starting to implement server side upload stats
author gracinet
date Sat, 21 Feb 2009 18:50:48 +0000
parents d17bf871c609
children 0844398c9e18
files src/tramline/core.py src/tramline/tests/data/input8.txt src/tramline/tests/test_core.py
diffstat 3 files changed, 107 insertions(+), 4 deletions(-) [+]
line diff
     1.1 --- a/src/tramline/core.py
     1.2 +++ b/src/tramline/core.py
     1.3 @@ -307,6 +307,7 @@
     1.4  class ProcessorRegistry:
     1.5      def __init__(self):
     1.6          self._processors = {}
     1.7 +        self._processors_by_progress_id = {}
     1.8  
     1.9      def getProcessor(self, id):
    1.10          return self._processors[id]
    1.11 @@ -319,15 +320,47 @@
    1.12                  break
    1.13          result = self._processors[id] = Processor(id)
    1.14          return result
    1.15 +    
    1.16 +    def registerProgressId(self, processor):
    1.17 +        self._processors_by_progress_id[processor.progress_id] = processor
    1.18 +        
    1.19 +    def removeProcessor(self, processor):
    1.20 +        prog_id = processor.progress_id
    1.21 +        if prog_id is not None:
    1.22 +            del self._processors_by_progress_id[prog_id]
    1.23 +        del self._processors[processor.id]
    1.24  
    1.25 -    def removeProcessor(self, processor):
    1.26 -        del self._processors[processor.id]
    1.27 +    def getUploaded(self, progress_id):
    1.28 +        proc = self._processors_by_progress_id.get(progress_id)
    1.29 +        if proc is None:
    1.30 +            return
    1.31 +        return proc.uploaded
    1.32  
    1.33  theProcessorRegistry = ProcessorRegistry()
    1.34  
    1.35  class Processor:
    1.36 +    """Processor for Post requests. 
    1.37 +
    1.38 +    API atributes:
    1.39 +       id: this is the main identifier in the registry. It is used for 
    1.40 +           continuity from one call to the other and from input to ouput 
    1.41 +           filters (final commit).
    1.42 +
    1.43 +       progress_id: this is the identifier user agents will use to request 
    1.44 +           stats on the current transfer upload. Unfortunately it has to be
    1.45 +           partially set by an <input> from the user agent, because we have no
    1.46 +           means to communicate it back once the process has started. 
    1.47 +           Still some effort is made to avoid malicious garbling of ids.
    1.48 +
    1.49 +       upload: this is the total file data upload this processor has seen. 
    1.50 +           there might be more than one file in this request.
    1.51 +    """
    1.52 +       
    1.53 +
    1.54      def __init__(self, id):
    1.55 +        self.uploaded = 0L
    1.56          self.id = id
    1.57 +        self.progress_id = None
    1.58          self._upload_files = []
    1.59          self._incoming = []
    1.60  	self._isize = 0
    1.61 @@ -410,10 +443,15 @@
    1.62          filename = self._disposition_options.get('filename')
    1.63          # if filename is empty, assume no file is submitted and submit
    1.64          # empty file -- don't tramline this special case
    1.65 -        if out.req.get_options().get('explicit_enable') and \
    1.66 -              self._disposition_options.get('name')=='tramline_enable':
    1.67 +        input_name = self._disposition_options.get('name')
    1.68 +        if input_name == 'tramline_enable' and out.req.get_options().get(
    1.69 +            'explicit_enable'):
    1.70              self.handle = self.handle_enable_vars
    1.71              return
    1.72 +        elif input_name == 'tramline_progress_id':
    1.73 +            # identifier used by async user agent requests to get upload stats
    1.74 +            self.handle = self.handle_progress_id
    1.75 +            return
    1.76          elif (filename is None or not filename) or \
    1.77                out.req.get_options().get('explicit_enable') and \
    1.78                self._disposition_options.get('name') not in self.vars_to_handle:
    1.79 @@ -444,6 +482,21 @@
    1.80          else:
    1.81              self._enable_vars+=line
    1.82  
    1.83 +    def handle_progress_id(self, line, out):
    1.84 +        out.write(line)
    1.85 +        if line == self._boundary:
    1.86 +            self.init_headers()
    1.87 +            self.handle = self.handle_headers
    1.88 +        elif line == self._last_boundary:
    1.89 +            # shouldn't happen if client has some consistency 
    1.90 +            self.handle = None # Processing done
    1.91 +        else:
    1.92 +            # GR: full socket info would be preferable, but it'll be different
    1.93 +            # for stat requests, and not sure the user agent can know and store
    1.94 +            # the full socket info of the upload for requesting.
    1.95 +            self.progress_id = str(out.req.connection.ip) + '-' + line.strip()
    1.96 +            theProcessorRegistry.registerProgressId(self)
    1.97 +
    1.98      def handle_data(self, line, out):
    1.99          out.write(line)
   1.100          if line == self._boundary:
   1.101 @@ -457,6 +510,7 @@
   1.102          if line == self._boundary:
   1.103              # write last line, but without \r\n
   1.104              self._f.write(self._previous_line[:-2])
   1.105 +            self.uploaded += len(self._previous_line[:-2])
   1.106              out.write(line)
   1.107              self._f.close()
   1.108              self._f = None
   1.109 @@ -464,6 +518,7 @@
   1.110          elif line == self._last_boundary:
   1.111              # write last line, but without \r\n
   1.112              self._f.write(self._previous_line[:-2])
   1.113 +            self.uploaded += len(self._previous_line[:-2]) # for completeness
   1.114              out.write(line)
   1.115              self._f.close()
   1.116              self._f = None
   1.117 @@ -471,6 +526,7 @@
   1.118          else:
   1.119              if self._previous_line is not None:
   1.120                  self._f.write(self._previous_line)
   1.121 +                self.uploaded += len(self._previous_line)
   1.122              self._previous_line = line
   1.123  
   1.124  def parse_header(s):
     2.1 new file mode 100644
     2.2 --- /dev/null
     2.3 +++ b/src/tramline/tests/data/input8.txt
     2.4 @@ -0,0 +1,16 @@
     2.5 +-----------------------------100323068321119442571506749230
     2.6 +Content-Disposition: form-data; name="tramline_progress_id"
     2.7 +
     2.8 +1234
     2.9 +-----------------------------100323068321119442571506749230
    2.10 +Content-Disposition: form-data; filename="test.txt"; name="test"
    2.11 +Content-Type: application/octet-stream
    2.12 +
    2.13 +first line
    2.14 +second line
    2.15 +
    2.16 +-----------------------------100323068321119442571506749230
    2.17 +Content-Disposition: form-data; name="submit"
    2.18 +
    2.19 +submit data
    2.20 +-----------------------------100323068321119442571506749230--
     3.1 --- a/src/tramline/tests/test_core.py
     3.2 +++ b/src/tramline/tests/test_core.py
     3.3 @@ -9,6 +9,8 @@
     3.4  from tramline.core import OPTION_ALLOW_GROUP_WRITE
     3.5  from tramline.core import TRAMLINE_RANGE_HEADER
     3.6  
     3.7 +from tramline.core import theProcessorRegistry
     3.8 +
     3.9  tramline_path = '/tmp/trampath'
    3.10  
    3.11  class StringTable(dict):
    3.12 @@ -17,6 +19,9 @@
    3.13              raise ValueError("Table values must be strings")
    3.14          dict.__setitem__(self, key, value)
    3.15  
    3.16 +class Connection:
    3.17 +    ip = 'TESTIP'
    3.18 +
    3.19  class Request:
    3.20      def __init__(self, method):
    3.21          self.headers_in = StringTable({'Content-Type' : 'multipart/form-data'})
    3.22 @@ -24,6 +29,7 @@
    3.23          self.main = None
    3.24          self.method = method
    3.25          self.options = {'tramline_path': tramline_path}
    3.26 +        self.connection = Connection()
    3.27  
    3.28      def get_options(self):
    3.29          return self.options
    3.30 @@ -136,6 +142,31 @@
    3.31          self.assertEquals(
    3.32              'first line\nsecond line', data)
    3.33          
    3.34 +    def test_inputfilter_progress_id(self):
    3.35 +        input = open(get_data_path('input8.txt'), 'rb')
    3.36 +        progress_id = 'TESTIP-1234'
    3.37 +        output = StringIO()
    3.38 +        filter = Filter(input, output)
    3.39 +        
    3.40 +        inputfilter(filter)
    3.41 +
    3.42 +        input.close()
    3.43 +
    3.44 +        uploaded = theProcessorRegistry.getUploaded(progress_id)
    3.45 +        self.assertFalse(uploaded is None)
    3.46 +
    3.47 +        output_data = output.getvalue()
    3.48 +
    3.49 +        file_id = self.file_id(output_data)
    3.50 +        f = open(os.path.join(tramline_upload_path(filter.req), file_id), 'rb')
    3.51 +        
    3.52 +        data = f.read()
    3.53 +        f.close()
    3.54 +        expected = 'first line\nsecond line\n'
    3.55 +        self.assertEquals('first line\nsecond line\n', data)
    3.56 +        # process has finished, so uploaded should be the entire length
    3.57 +        self.assertEquals(len(expected), uploaded)
    3.58 +        
    3.59      def test_split_filter(self):
    3.60          f = open(get_data_path('input2.txt'), 'rb')
    3.61          data = f.read()