PYTHON   64

poll

Guest on 30th April 2022 11:53:29 PM

  1. import zmq
  2. import gevent
  3. from gevent import select
  4.  
  5. from zmq import Poller as _original_Poller
  6.  
  7.  
  8. class _Poller(_original_Poller):
  9.     """Replacement for :class:`zmq.Poller`
  10.  
  11.    Ensures that the greened Poller below is used in calls to
  12.    :meth:`zmq.Poller.poll`.
  13.    """
  14.     _gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
  15.  
  16.     def _get_descriptors(self):
  17.         """Returns three elements tuple with socket descriptors ready
  18.        for gevent.select.select
  19.        """
  20.         rlist = []
  21.         wlist = []
  22.         xlist = []
  23.  
  24.         for socket, flags in self.sockets:
  25.             if isinstance(socket, zmq.Socket):
  26.                 rlist.append(socket.getsockopt(zmq.FD))
  27.                 continue
  28.             elif isinstance(socket, int):
  29.                 fd = socket
  30.             elif hasattr(socket, 'fileno'):
  31.                 try:
  32.                     fd = int(socket.fileno())
  33.                 except:
  34.                     raise ValueError('fileno() must return an valid integer fd')
  35.             else:
  36.                 raise TypeError('Socket must be a 0MQ socket, an integer fd '
  37.                                 'or have a fileno() method: %r' % socket)
  38.  
  39.             if flags & zmq.POLLIN:
  40.                 rlist.append(fd)
  41.             if flags & zmq.POLLOUT:
  42.                 wlist.append(fd)
  43.             if flags & zmq.POLLERR:
  44.                 xlist.append(fd)
  45.  
  46.         return (rlist, wlist, xlist)
  47.  
  48.     def poll(self, timeout=-1):
  49.         """Overridden method to ensure that the green version of
  50.        Poller is used.
  51.  
  52.        Behaves the same as :meth:`zmq.core.Poller.poll`
  53.        """
  54.  
  55.         if timeout is None:
  56.             timeout = -1
  57.  
  58.         if timeout < 0:
  59.             timeout = -1
  60.  
  61.         rlist = None
  62.         wlist = None
  63.         xlist = None
  64.  
  65.         if timeout > 0:
  66.             tout = gevent.Timeout.start_new(timeout/1000.0)
  67.         else:
  68.             tout = None
  69.  
  70.         try:
  71.             # Loop until timeout or events available
  72.             rlist, wlist, xlist = self._get_descriptors()
  73.             while True:
  74.                 events = super(_Poller, self).poll(0)
  75.                 if events or timeout == 0:
  76.                     return events
  77.  
  78.                 # wait for activity on sockets in a green way
  79.                 # set a minimum poll frequency,
  80.                 # because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
  81.                 _bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
  82.                 try:
  83.                     select.select(rlist, wlist, xlist)
  84.                 except gevent.Timeout as t:
  85.                     if t is not _bug_timeout:
  86.                         raise
  87.                 finally:
  88.                     _bug_timeout.cancel()
  89.  
  90.         except gevent.Timeout as t:
  91.             if t is not tout:
  92.                 raise
  93.             return []
  94.         finally:
  95.             if timeout > 0:
  96.                 tout.cancel()

Raw Paste


Login or Register to edit or fork this paste. It's free.