Piping numpy arrays to other processes in python

To pipe data from one process to another as a stream in python we need to pickle the object and pass it to the pipe stream.
In this example I’ve used Numpy arrays but this could be applied to any object that can be pickled in Python.
This took far too long to get working and I could find little information online on how to put it all together so here it is.
This code is Python 3 only, I’ve only run this on a Mac.

I’ve used binary as the stream rather than text purley becuase of effiencies. Numpy arrays can get huge! This means readline() is not
going to work. Instead, I send a single control byte , 1, for data and 0 for stop. This could be extended to include other control operations.
I then send the length of the data as a 8 byte int, followed by the data itself.

simpleSend.py

import numpy as np
import pickle
import sys
import io
import time

#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])

def send_data(arr):
    dataStr=pickle.dumps(arr)  #pickle the data array into a byte array
    dlen=len(dataStr).to_bytes(8, byteorder='big') #find the length of the array and
    print(control_data.decode('latin-1'),end='',flush=True)  #convert this to a byte array
    print(dlen.decode('latin-1'), end='', flush=True)   #encode the data and write it
    print(dataStr.decode('latin-1'), end='', flush=True)  # end='' will remove that extra \r\n

def send_stop():
    print(control_stop.decode('latin-1'), end='', flush=True) 

#set the stdout such that it prints in latin-1,   sys.stdout.detach() is a binary stream
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='latin-1')

for p in range(10):
    arr=np.ones((5000,500))*p  #generate some data
    send_data(arr)
    #the sleep is purely for testing and can be removed, ie does the reader fall over after a long delay
    time.sleep(.1)
send_stop()        

simpleReceiver.py

import numpy as np
import sys
import pickle

#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])

while True:
    data=sys.stdin.buffer.read(1)   #read the control byte
    if data==control_data:
        data=sys.stdin.buffer.read(8)  #read the data length
        dlen=int.from_bytes(data, byteorder='big')
        print('data lenght %d'%dlen)        
        data=sys.stdin.buffer.read(dlen) #read the data        
        npd=pickle.loads(data)  #unpickle
        print(npd.shape)
        print(npd.max())
    elif data==control_stop:
        print('stopped')
        break
    else:
        print('Oh no')

to run this
python simpleSend.py | python simpleReceiver.py

If we want to use Python’s subprocess module to start simpleReceiver.py we basically need to write to the STDIN instead of print

import numpy as np
import pickle
import sys
import subprocess as sp

#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])

def send_data(arr,buff):
    dataStr=pickle.dumps(arr)  #pickle the data array into a byte array
    dlen=len(dataStr).to_bytes(8, byteorder='big') #find the length of the array and
    mp.stdin.write(control_data)
    mp.stdin.write(dlen)
    mp.stdin.write(dataStr)
    mp.stdin.flush() #not sure this needed
     
def send_stop(mp):
    mp.stdin.write(control_stop)
    mp.stdin.flush()
     
try:
    mp = sp.Popen("python3 simpleReceiver.py",  shell = True,stdin=sp.PIPE)   
except sp.CalledProcessError as err:
    print('ERROR:', err)
    sys.exit(-1)

for p in range(10):
    arr=np.ones((5000,5000))*p  #generate some data
    send_data(arr,mp)
send_stop(mp)        

With such a large array 5000×5000 this takes sometime. Running it through the python profiler indicates about 75% of the time is taken by pickle.dumps and most of the rest of the remaining 25% is taken by the write operation. Numpy’s own method gives a speed increase. Replacing dataStr=pickle.dumps(arr) with dataStr=arr.tobytes() and npd=pickle.loads(data) with npd=np.frombuffer(data) more than halves the time taken but lose the shape and dtype information. This would have to be sent along with the data.

view raw

numpyPipe.md

hosted with ❤ by GitHub

Adaptive-particle-filter – Particle filter – Google Project Hosting

The code used for our adaptive particle filter.

adaptive-particle-filter – Particle filter – Google Project Hosting.

 

For this paper: Hassan, WaqasBangalore Manjunathamurthy, NagachetanBirch, PhilipYoung, Rupert and Chatwin, Chris (2012)Adaptive Sample Count Particle Filter. Computer Vision and Image Understanding, 116 (12). pp. 1208-1222. ISSN 1077-3142

http://dx.doi.org/10.1016/j.cviu.2012.09.001