Provides receivers to store data from detectors into HDF5 Files and optional parallel data transformation and processing. Also forwards data on "secondary zmq port" for arbitrary processing.
The streaming receiver package covers a broad range of streaming interfaces, while providing a common pattern of writing to file. To allow support for new, specific detectors/streamig interfaces a child class, which inherits the default detector class, must be added inside of detector.py.
Receivers for the general case are supporting the following detectors:
- Teledyne PI-MTE3 (Picam)
- Dhyana
- Ximea
- Fastcam Nova
- Rigaku
- Xsp3
- Andor Zyla
Additional receivers are supporting the following detectors:
- Dectris Eiger
- Dectris Pilatus with frame ordering and cbf -> bitshuffle lz4 compression
- Hamamatsu Orca Lightning
- Xspectrum Lambda
The configuration of the different detector is stored in the detectors.yaml file.
streaming-receiver detectors.yaml nanomax-eigerDocker image are automatically created in the gitlab CI when a new release is tagged. Docker compose files are provided for the different detectors and beamlines.
Running with the DAQ Helm chart, it is possible to provide an additional environment variable for DEBUG output
daq:
# Pipeline Template
pipeline:
template: streaming_receiver
override: |
extraEnv:
- name: LOG_LEVEL
value: DEBUGAn instance of the streaming receiver can be deployed on the daq2024 Kubernetes cluster by doing two things:
- Creating a repository under scisw/deployments on gitlab.
- In addition to creating a deployment repository, a namespace in the daq2024 cluster has to be created as well.
The namespace has to be linked with the deployment repository in order for the gitlab CI to work. This can be done under Labels & Annotations in the namespace config, where a key-value pair has to be added. It should have the following structure:
Key Value project-id.tokenizer.gitlab.maxiv.lu.se/PROJECT_ID enabled
The PROJECT_ID represents the project id of your deployment repository, found by pressing the three vertically aligned dots in the top right corner of your repository. If something goes wrong, check out the other deployments/namespaces in order to compare.
The streaming-receiver has a http rest interface to get the status, number of received frames and most recent frame for live viewing. The api is build with fastapi and has an autogenerated OpenAPI documentation.
The Secondary zmq data port for live processing is a zmq PUSH socket.
There are 3 different types of messages. The header message comes once at the start of the scan, followed by the image messages and a final series_end message at the end of the scan. The messages are json encoded. The msg_number field in the messages is a monotonic increasing values that helps to appropriately order the messages.
{
"htype": "header",
"msg_number": 0,
"filename": "/tmp/testfile.h5"
}The image message is a multipart zmq message with 2 parts. First part is a json header in the following format:
{
"htype": "image",
"msg_number": 1,
"frame": 0,
"shape": (100, 100),
"type": "uint32",
"compression": "bslz4"
}The second part is the binary blob of the array with the above description.
compression can be:
- "bslz4" (bitshuffle lz4 compression)
- "none"
{
"htype": "series_end",
"msg_number": 2
}Here is some sample python code how to read the seconday zmq stream
import zmq
import json
import time
import numpy as np
from bitshuffle import decompress_lz4
hostname = 'localhost'
context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.connect('tcp://%s:9999' %hostname)
while True:
parts = pull.recv_multipart(copy=False)
header = json.loads(parts[0].bytes)
print(header)
if header['htype'] == 'image':
if header['compression'] == 'bslz4':
img = decompress_lz4(parts[1].buffer,
header['shape'],
np.dtype(header['type']))
else:
img = np.frombuffer(parts[1], dtype=header['type'])
img = img.reshape(header['shape']Documentation can be found in Internals.md.
