Dask distributed Failed to deserialize with numpy.arrays and sparse.matrices

I'm getting the following error multiple times on different tasks along the graph (changes between executions). Possibly when certain tasks return numpy.arrays and scipy.sparse matrices.

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x04'
Traceback (most recent call last):
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
EOFError: Ran out of input
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/core.py", line 119, in loads
    value = _deserialize(head, fs)
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 158, in deserialize
    return f(header, frames)
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 20, in <lambda>
    deserializers = {None: lambda header, frames: pickle.loads(b''.join(frames))}
  File "/home/user/venv/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
EOFError: Ran out of input
distributed.comm.utils - ERROR - truncated data stream (485 bytes): [b'', b"\x92\x83\xa6report\xc2\xa4keys\x91\xd9P('_avro_body-read-block-bag-from-delayed-67c7a9690149de9743ed970f873fa1d6', 283)\xa2op\xabdelete-data\x86\xa8priority\x93\x00\x01\xcc\xce\xa6nbytes\x81\xd9:('bag-from-delayed-67c7a9690149de9743ed970f873fa1d6', 283)\xce\x00 \x86p\xa8duration\xcb@\x18\x16m\x88xX\x00\xa7who_has\x81\xd9:('bag-from-delayed-67c7a9690149de9743ed970f873fa1d6', 283)\x91\xb5tcp://127.0.0.1:38623\xa2op\xaccompute-task\xa3key\xd9K('pluck-map-process_features_sparse-d94d304dc59efb780c39bfb0ca4df37f', 283)", b'\x83\xabbytestrings\x90\xa7headers\x81\x92\x01\xa4task\x83\xabcompression\x91\xc0\xa5count\x01\xa7lengths\x91\x02\xa4keys\x91\x92\x01\xa4task', b'\x80\x04']
distributed.worker - INFO - Connection to scheduler broken. Reregistering
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------

This is always a EOFError: Ran out of input error with differently-sized buffers (sometimes as small as several bytes), with the entire cluster running on a single machine.

Ideally I'd like a solution to the actual problem but ways to investigate the issue and understanding what could possibly go wrong will also be appreciated. Right now I'm kinda stuck without knowing how to fix the issue at hand.

Running client.get_versions(check=True) finishes without an error and this persists after updating all packages (namely numpy, scipy, dask, dask-distributed, cloudpickle)