vendor/zasync/1.1

changeset 6:59012a64bf99 1.1-nux

Modified to work with persistent queue
author rspivak
date Tue, 10 Jan 2006 03:28:45 +0000
parents cde4b66aa53c
children fa907c4566b9
files manager.py
diffstat 1 files changed, 20 insertions(+), 8 deletions(-) [+]
line diff
     1.1 --- a/manager.py
     1.2 +++ b/manager.py
     1.3 @@ -42,6 +42,8 @@
     1.4  from Products.Sessions.BrowserIdManager import BROWSERID_MANAGER_NAME
     1.5  from AccessControl.SecurityInfo import allow_class
     1.6  
     1.7 +from nuxeo.persistentqueue.persistentqueue import PersistentQueue
     1.8 +
     1.9  import permissions, bforests, interfaces
    1.10  
    1.11  UNCALLED = 0
    1.12 @@ -462,9 +464,13 @@
    1.13          return timeout
    1.14  InitializeClass(Deferred)
    1.15  
    1.16 -def getDeferredInfo(context, dictionary, sort_field=None, reverse=False):
    1.17 +def getDeferredInfo(context, info, sort_field=None, reverse=False):
    1.18      res = []
    1.19 -    for d in dictionary.values():
    1.20 +    if isinstance(info, PersistentQueue):
    1.21 +        ditems = info
    1.22 +    else:
    1.23 +        ditems = info.values()
    1.24 +    for d in ditems:
    1.25          d = d.__of__(context)
    1.26          plugin, args, kwargs = d.getSignature()
    1.27          state = d.getState()
    1.28 @@ -536,7 +542,7 @@
    1.29          if id is not None:
    1.30              self.id = id
    1.31          # items to be picked up by zasync
    1.32 -        self._new = OOBTree.OOBTree()
    1.33 +        self._new = PersistentQueue()
    1.34          # items collected by zasync from the queue
    1.35          self._accepted = OOBTree.OOBTree()
    1.36          # long term cache
    1.37 @@ -664,7 +670,7 @@
    1.38          d.key = key
    1.39          d.id = repr(key)
    1.40          d.local_key = randomizer
    1.41 -        self._new[key] = d
    1.42 +        self._new.append(d)
    1.43          wrapped = d.__of__(self)
    1.44          wrapped.manage_fixupOwnershipAfterAdd()
    1.45          user=getSecurityManager().getUser()
    1.46 @@ -689,7 +695,10 @@
    1.47      security.declareProtected(
    1.48          permissions.MakeAsynchronousApplicationCalls, 'getDeferred')
    1.49      def getDeferred(self, d_id, default=None):
    1.50 -        for src in (self._new, self._accepted, self._resolved):
    1.51 +        for src in self._new:
    1.52 +            if src.key == d_id:
    1.53 +                return src.__of__(self)
    1.54 +        for src in (self._accepted, self._resolved):
    1.55              res = src.get(d_id)
    1.56              if res is not None:
    1.57                  return res.__of__(self)
    1.58 @@ -766,9 +775,12 @@
    1.59  
    1.60      security.declarePrivate('acceptAll')
    1.61      def acceptAll(self):
    1.62 -        self._accepted.update(self._new)
    1.63 -        res = self._new.values()
    1.64 -        self._new.clear()
    1.65 +        new = self._new
    1.66 +        for d in new:
    1.67 +            self._accepted[d.key] = d
    1.68 +        res = new[:]
    1.69 +        while new:
    1.70 +            new.pop(0)
    1.71          return res
    1.72  
    1.73      security.declarePrivate('getAcceptedCalls')