To pipe data from one process to another as a stream in python we need to pickle the object and pass it to the pipe stream.
In this example I’ve used Numpy arrays but this could be applied to any object that can be pickled in Python.
This took far too long to get working and I could find little information online on how to put it all together so here it is.
This code is Python 3 only, I’ve only run this on a Mac.
I’ve used binary as the stream rather than text purley becuase of effiencies. Numpy arrays can get huge! This means readline()
is not
going to work. Instead, I send a single control byte , 1, for data and 0 for stop. This could be extended to include other control operations.
I then send the length of the data as a 8 byte int, followed by the data itself.
import numpy as np
import pickle
import sys
import io
import time
#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])
def send_data(arr):
dataStr=pickle.dumps(arr) #pickle the data array into a byte array
dlen=len(dataStr).to_bytes(8, byteorder='big') #find the length of the array and
print(control_data.decode('latin-1'),end='',flush=True) #convert this to a byte array
print(dlen.decode('latin-1'), end='', flush=True) #encode the data and write it
print(dataStr.decode('latin-1'), end='', flush=True) # end='' will remove that extra \r\n
def send_stop():
print(control_stop.decode('latin-1'), end='', flush=True)
#set the stdout such that it prints in latin-1, sys.stdout.detach() is a binary stream
sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='latin-1')
for p in range(10):
arr=np.ones((5000,500))*p #generate some data
send_data(arr)
#the sleep is purely for testing and can be removed, ie does the reader fall over after a long delay
time.sleep(.1)
send_stop()
import numpy as np
import sys
import pickle
#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])
while True:
data=sys.stdin.buffer.read(1) #read the control byte
if data==control_data:
data=sys.stdin.buffer.read(8) #read the data length
dlen=int.from_bytes(data, byteorder='big')
print('data lenght %d'%dlen)
data=sys.stdin.buffer.read(dlen) #read the data
npd=pickle.loads(data) #unpickle
print(npd.shape)
print(npd.max())
elif data==control_stop:
print('stopped')
break
else:
print('Oh no')
to run this
python simpleSend.py | python simpleReceiver.py
If we want to use Python’s subprocess module to start simpleReceiver.py
we basically need to write
to the STDIN instead of print
import numpy as np
import pickle
import sys
import subprocess as sp
#define some control bytes
control_data=bytes([1])
control_stop=bytes([0])
def send_data(arr,buff):
dataStr=pickle.dumps(arr) #pickle the data array into a byte array
dlen=len(dataStr).to_bytes(8, byteorder='big') #find the length of the array and
mp.stdin.write(control_data)
mp.stdin.write(dlen)
mp.stdin.write(dataStr)
mp.stdin.flush() #not sure this needed
def send_stop(mp):
mp.stdin.write(control_stop)
mp.stdin.flush()
try:
mp = sp.Popen("python3 simpleReceiver.py", shell = True,stdin=sp.PIPE)
except sp.CalledProcessError as err:
print('ERROR:', err)
sys.exit(-1)
for p in range(10):
arr=np.ones((5000,5000))*p #generate some data
send_data(arr,mp)
send_stop(mp)
With such a large array 5000×5000 this takes sometime. Running it through the python profiler indicates about 75% of the time is taken by pickle.dumps
and most of the rest of the remaining 25% is taken by the write operation. Numpy’s own method gives a speed increase. Replacing dataStr=pickle.dumps(arr)
with dataStr=arr.tobytes()
and npd=pickle.loads(data)
with npd=np.frombuffer(data)
more than halves the time taken but lose the shape and dtype information. This would have to be sent along with the data.