Stackless Python multi-process channels:

N.Stein - April 2012


Here are some little experiments trying to combine multiprocessing and stackless.
First naïve approach: take a real stackless channel and share it: doesn't seem to be working...


With a (multiprocessing) Queue:
    import multiprocessing as mp
    import stackless,os,time
    from stackless_addins2 import *

    def f(sh,dx):
      Nonblocking.Start()
      if(dx=='tx'):
        stackless.tasklet(tx)(sh)
      else:
        stackless.tasklet(rx)(sh)
      stackless.run()


    def rx(sh):
      while(1):
        Nonblocking.Sleep(.5)
        if(sh.empty()): continue #important for the schedule
        #tmp=sh.get()
        tmp=sh[0]
        print('I (%i) received: %s' %(os.getpid(),tmp))


    def tx(sh):
      i=0
      while(1):
        Nonblocking.Sleep(2)
        sh.put('hello %i from %i' %(i,os.getpid()))
        i+=1
        print('TX done!')




    def mainticker():
      while(1):
        Nonblocking.Sleep(5)
        print('Parent alive');


    if __name__ == '__main__':

      sh=mp.Queue()

      p1 = mp.Process(target=f, args=(sh,'tx'))
      p2 = mp.Process(target=f, args=(sh,'rx'))
      p1.start()
      p2.start()

      Nonblocking.Start() 
      stackless.tasklet(mainticker)()
      stackless.run()
      

With a (multiprocessing) manager, through a list:
    from multiprocessing import Process, Manager, Lock
    import stackless,os,time
    from stackless_addins2 import *

    def f(sh,dx):
      Nonblocking.Start()
      print(sh)
      if(dx=='tx'):
        stackless.tasklet(tx)(sh[0])
      else:
        stackless.tasklet(rx)(sh[0])

      stackless.run()

    def tx(ch):
      print('tx starts')
      while(1):
        Nonblocking.Sleep(1)
        print('sending...')
        ch.send('coucou from %i' %os.getpid())

    def rx(ch):
      print('rx starts')
      while(1):
        data=ch.receive()
        print('%i received: %s' %(os.getpid(),data))



    if __name__ == '__main__':
      mgr = Manager()

      mpchannels=mgr.list()    
      mpchannels.append(stackless.channel())

      p1 = Process(target=f, args=(mpchannels,'tx'))
      p2 = Process(target=f, args=(mpchannels,'rx'))

      p1.start()
      p2.start()


      try: 
        while(1): pass
      except KeyboardInterrupt:
        print('%i User interrupted !' %os.getpid())    
      

So in the end, I came up with this re-implementation of multi-process-aware stackless channels.
For now, it lacks the possibility for a child process to create an mpchannel by itself. (mpchannels must be created from the main process and passed down to the children). I hope to find a little time implement this possibility in a way wich is practical for the programmer. In the same vein, overwriting the tasklet constructor to add the possibility to choose the process on which to start it might be cool too.

I find the concept exciting, because it lets you communicate and synchronize between tasklets on different processes. All the power of Stackless Python AND being multi-core (effectively bypassing the GIL) whenever you want, it's just a matter of architecturing your app properly... :-)


mpchannels.py (beta)
    # -*- coding: utf-8 -*-
    import os
    from multiprocessing import Process, Manager, Lock
    import stackless


    class sharedQ(object):

      def __init__(self,mgr):
        self.q=mgr.list()
        self.lock = Lock()

      def put(self,pid,direction,data_present,v,exception=False):
        self.q.append([pid,direction,data_present,v,exception])

      def get(self):
        with self.lock:
          if(len(self.q)>0): return(self.q.pop(0)) # Here would be the race!
          return(None)

      def read_info(self,idx=0):
        with self.lock:
          if(len(self.q)>idx): return((self.q[idx][0],self.q[idx][1],self.q[idx][2],)) # Here would be the race!
          return(None)    

      def add_data(self,data,exception=False):
        with self.lock:
          #Security, should not happen: if data present, you should not try to overwrite it
          if(self.q[0][2]): raise RuntimeError, 'Trying to add_data to shared queue when data already present !'
          x=self.q[0] ##Why passing by x ? don't ask!
          x[2]=True
          x[3]=data
          x[4]=exception
          self.q[0]=x

      def has_pid(self,searched_pid):
        with self.lock:
          for (pid,direction,data_present,v,e) in self.q:
            if(searched_pid==pid): return(True)
          return(False)

      def empty(self):
        return(len(self.q)==0)

      def __len__(self):
        return(len(self.q))

      def __str__(self):
        str=''
        for e in self.q: str+='\n PID=%i DIR=%s DP=%s EXCEPTION=%s DATA=%s ' %(e[0],e[1],e[2],e[4],e[3])
        return(str)




    class mpchannel(object):

      def __init__(self,mgr):
        """An mpchannel object is the multiprocessing equivalent of a stackless channel,
        in a multiprocecessing environment. It is used for communication between tasklets,
        on the same process or across different processes .
        By sending on a channel, a tasklet that is waiting to receive
        is resumed. If there is no waiting receiver, the sender is suspended.
        By receiving from a channel, a tasklet that is waiting to send
        is resumed. If there is no waiting sender, the receiver is suspended.
        """

        self.shQ=sharedQ(mgr) # Shared queue
        self._closing=mgr.Value('B',0) ##Unsigned char 0 =False, 1=True
        self._closed=mgr.Value('B',0)
        self.lock=Lock()
        print('mpChannel instantiated from pid %i' %os.getpid())


      def send(self,data): 
        """channel.send(value) -- send a value over the channel.
        If no other tasklet is already receiving on the channel,
        the sender will be blocked. Otherwise, both receiver and sender will
        be activated immediately. This is not like the original stackless channles
        where the sender is put at the end of the runnables list.
        """
        self._send(data,False)

      def send_sequence(self,seq):
        """channel.send_sequence(seq) -- send a stream of values
        over the channel. 
        """
        for item in seq: self._send(item,False)

      def send_exception(self,ex):
        """channel.send_exception(exc, value) -- send an exception over the channel.
        exc must be a subclass of Exception.
        Behavior is like channel.send, but that the receiver gets an exception.
        """
        self._send(ex,True)

      def _send(self,data,exception=False):
        with self.lock:
          if(self._closed.value==1): raise StopIteration
          if(self.shQ.empty()): ## No senders, no receivers, just put me as the only sender in the Q
            if(self._closing.value==1): raise StopIteration # No receivers, and closing: you can't send ! (should not happen as empty Q on closing switches closed together with closing)
            self.shQ.put(os.getpid(),'S',True,data,exception)
            # Now block until my pid dissapears from Q, but outside the lock!
          else:
            (pid,dir,dp)=self.shQ.read_info()
            if((dir=='S') or (dp)): ## only senders, or a receiver freshky served, just put me in the back of the Q
              if(self._closing.value==1): raise StopIteration # No receivers, and closing: you can't send !
              self.shQ.put(os.getpid(),'S',True,data,exception)
              # Now block until my pid dissapears from Q, but outside the lock!
            else: ## Receiver in first row, without data,  let's talk to him !
              self.shQ.add_data(data,exception)
              if((self._closing.value==1) and self.shQ.empty()): self._closed.value=1 # We just served the last receiver, we're now closed!
              return()

        #print('Send of %s in queue, waiting...' %os.getpid())
        while(self.shQ.has_pid(os.getpid())): stackless.schedule()
        #print('Sent %s !' %os.getpid())


      def receive(self):
        """mpchannel.receive() -- receive a value over the channel.
        If no other tasklet in any of the processes is already sending on the channel,
        the receiver will be blocked. Otherwise, the receiver will
        continue immediately, and the sender (in a possibly different process) resumes immediately.
        This is not like the original stackless channles where the sender is put at the end of
        the runnables list. On mpchannels, there is no channel flag to change this policy.
        """
        with self.lock:
          if(self._closed.value==1): raise StopIteration      
          if(self.shQ.empty()): ## No senders, no receivers, just put me as the only receiver in the Q
            if(self._closing.value==1): raise StopIteration # No receivers, and closing: you can't send ! (should not happen as empty Q on closing switches closed together with closing)
            self.shQ.put(os.getpid(),'R',False,None)
            # Now block until some data is present for me, but outside the lock!
          else:
            (pid,dir,dp)=self.shQ.read_info()
            if(dir=='R'): ## only receivers, (a sender without data present should not happen), just put me in the back of the Q
              if(self._closing.value==1): raise StopIteration # No senders, and closing: you can't receive !
              self.shQ.put(os.getpid(),'R',False,None)
              # Now block until some data is present for me, but outside the lock!
            else: ## Sender in first row, pick his data ! (a sender without data present should not happen, but would endup here, returning None)
              (pid,dir,dp,data,execption)=self.shQ.get()
              if(execption): raise data
              if((self._closing.value==1) and self.shQ.empty()): self._closed.value=1 # We just received from the last sender, we're now closed!
              return(data)

        #print('%s wants to receive, waiting...'%os.getpid())
        gotmydata=False
        while(not gotmydata):
          stackless.schedule()
          (pid,dir,dp)=self.shQ.read_info()
          gotmydata=((pid==os.getpid()) and (dir=='R') and (dp))
        (pid,dir,dp,data,execption)=self.shQ.get()
        #print('%s has received [%s] !!!'%(os.getpid(),data))
        if(execption): raise data
        else: return(data)

      def close(self):
        """mpchannel.close() -- stops the channel from enlarging its queue.
        If the channel is not empty, the flag 'closing' becomes true.
        If the channel is empty, the flag 'closed' becomes true.
        """
        if(self._closed.value==1): return
        with self.lock:    
          self._closing.value=1
          if(self.shQ.empty()): self._closed.value=1 #We're lucky, no senders, no receiver, let's close immediately !

      def open(self):
        """reopen a channel. See channel.close.
        """
        if(self._closed.value!=1): return
        with self.lock:
          self._closed.value=0
          self._closing.value=0

      def _get_closing(self): return(self._closing.value==1)
          
      def _get_closed(self): return(self._closed.value==1)

      def _get_balance():
        ### We should have only receivers, or only senders in the Q, so first row says it all...well, almost:
        ### The only exception I can think of, is a Q with senders, 
        ### and ONE receiver in the first row, with data present, but not yet picked up by its receiver
        with self.lock:
          if(self.shQ.empty): return(0)
          (pid,dir,dp)=self.shQ.read_info()
          if(dp):
            if(len(self.shQ)>1): (pid,dir,dp)=self.shQ.read_info(1)
            else: return(0)
            if(dir=='R'): return(-len(self.shQ))
            return(len(self.shQ))

      balance=property(_get_balance,None,None,"")
      closing=property(_get_closing,None,None,"")
      closed=property(_get_closed,None,None,"")

    


Here is a quick example.
("stackless_addins" is just a little package containning a number of useful idioms from stackless website)

tokenring.py
    import os,time
    from multiprocessing import Process, Manager, Lock
    import stackless
    from stackless_addins import *
    from mpchannels import *



    def f(ch,dx):
      Nonblocking.Start()
      if(dx=='tx'):
        print('G1:%i' %os.getpid())
        stackless.tasklet(g1)(ch)
      elif(dx=='rx'):
        print('G2:%i' %os.getpid())
        stackless.tasklet(g2)(ch)
      elif(dx=='rx2'):
        print('G3:%i' %os.getpid())
        stackless.tasklet(g3)(ch)

      try: stackless.run()
      except KeyboardInterrupt:
        print('%i User interrupted !' %os.getpid())




    def g1(ch):
      while(1):
        Nonblocking.Sleep(3)
        ch.send('hello from %i' %os.getpid())


    def g2(ch):
      while(1):
        print('%i received %s' %(os.getpid(),ch.receive()))


    def g3(ch):
      while(1):
        Nonblocking.Sleep(1)
        ch.send('hello from %i' %os.getpid())



    if __name__ == '__main__':
      manager = Manager()

      ch=mpChannel(manager)

      p1 = Process(target=f, args=(ch,'tx'))
      p2 = Process(target=f, args=(ch,'rx'))
      p3 = Process(target=f, args=(ch,'rx2'))
      p1.start()
      p2.start()
      p3.start()

      try: 
        while(1): pass
      except KeyboardInterrupt:
        print('%i User interrupted !' %os.getpid())
    


Here is another quick example with 7 processes sendind each other a token that was launched by the parent. Using 8 processes (7 Children + one parent) is of course no coincidence, as I tested this on an I7 (8 detected cores - 4 real + 4 virt).
Benchmarking with this piece of code on my I7 @ 3.5GHz gives me around 14ms round-trip, thus around 2ms between sending and receiving. This is of course auwful if compared to a normal channel transmition time on a signle core, which is expressed in µs (/1000 !), but still, here's a neat way of communicating interprocess, with the different schedulers being aware of blocking calls as usual.

tokenring.py
    import os,time
    from multiprocessing import Process, Manager, Lock
    import stackless
    from mpchannels import *


    def f(chin,chout):
      stackless.tasklet(pass_token)(chin,chout)
      try: stackless.run()
      except KeyboardInterrupt:
        print('%i User interrupted !' %os.getpid())




    def pass_token(chin,chout):
      start_time=time.time()
      for i in range(0,100):
        s=chin.receive()
        dt=(time.time()-start_time)*1000
        start_time=time.time()
        if(i<99): chout.send('%s\n%i:%0.3f' %(s,os.getpid(),dt))
        else: print(s)
      

    ## Token-ring test

    if __name__ == '__main__':
      manager = Manager()

      ch1=mpChannel(manager)
      ch2=mpChannel(manager)
      ch3=mpChannel(manager)
      ch4=mpChannel(manager)
      ch5=mpChannel(manager)
      ch6=mpChannel(manager)
      ch7=mpChannel(manager)

      p1 = Process(target=f, args=(ch1,ch2))
      p2 = Process(target=f, args=(ch2,ch3))
      p3 = Process(target=f, args=(ch3,ch4))
      p4 = Process(target=f, args=(ch4,ch5))
      p5 = Process(target=f, args=(ch5,ch6))
      p6 = Process(target=f, args=(ch6,ch7))
      p7 = Process(target=f, args=(ch7,ch1))
      p1.start()
      p2.start()
      p3.start()
      p4.start()
      p5.start()
      p6.start()
      p7.start()

      #Launch the token
      ch1.send('%i ' %os.getpid())

      try: 
        while(1): pass
      except KeyboardInterrupt:
        print('%i User interrupted !' %os.getpid())