Skip to content

maxiv-science/streaming-receiver

Repository files navigation

Streaming receiver

Provides receivers to store data from detectors into HDF5 Files and optional parallel data transformation and processing. Also forwards data on "secondary zmq port" for arbitrary processing.

The streaming receiver package covers a broad range of streaming interfaces, while providing a common pattern of writing to file. To allow support for new, specific detectors/streamig interfaces a child class, which inherits the default detector class, must be added inside of detector.py.

Receivers for the general case are supporting the following detectors:

  • Teledyne PI-MTE3 (Picam)
  • Dhyana
  • Ximea
  • Fastcam Nova
  • Rigaku
  • Xsp3
  • Andor Zyla

Additional receivers are supporting the following detectors:

  • Dectris Eiger
  • Dectris Pilatus with frame ordering and cbf -> bitshuffle lz4 compression
  • Hamamatsu Orca Lightning
  • Xspectrum Lambda

Run

Configuration

The configuration of the different detector is stored in the detectors.yaml file.

Native

streaming-receiver detectors.yaml nanomax-eiger

Docker

Docker image are automatically created in the gitlab CI when a new release is tagged. Docker compose files are provided for the different detectors and beamlines.

K8s Helm

Running with the DAQ Helm chart, it is possible to provide an additional environment variable for DEBUG output

daq:
  # Pipeline Template
  pipeline:
    template: streaming_receiver
    override: |
      extraEnv:
        - name: LOG_LEVEL
          value: DEBUG

DAQ

DAQ overview

Pipeline overview

DAQ deployment

An instance of the streaming receiver can be deployed on the daq2024 Kubernetes cluster by doing two things:

  • Creating a repository under scisw/deployments on gitlab.
  • In addition to creating a deployment repository, a namespace in the daq2024 cluster has to be created as well.

The namespace has to be linked with the deployment repository in order for the gitlab CI to work. This can be done under Labels & Annotations in the namespace config, where a key-value pair has to be added. It should have the following structure:

Key                                                        Value
project-id.tokenizer.gitlab.maxiv.lu.se/PROJECT_ID         enabled

The PROJECT_ID represents the project id of your deployment repository, found by pressing the three vertically aligned dots in the top right corner of your repository. If something goes wrong, check out the other deployments/namespaces in order to compare.

Http interface

The streaming-receiver has a http rest interface to get the status, number of received frames and most recent frame for live viewing. The api is build with fastapi and has an autogenerated OpenAPI documentation.

Secondary zmq data port for live processing

The Secondary zmq data port for live processing is a zmq PUSH socket.

There are 3 different types of messages. The header message comes once at the start of the scan, followed by the image messages and a final series_end message at the end of the scan. The messages are json encoded. The msg_number field in the messages is a monotonic increasing values that helps to appropriately order the messages.

header message

{
    "htype": "header",
    "msg_number": 0,
    "filename": "/tmp/testfile.h5"
}

image message

The image message is a multipart zmq message with 2 parts. First part is a json header in the following format:

{
    "htype": "image",
    "msg_number": 1,
    "frame": 0,
    "shape": (100, 100),
    "type": "uint32",
    "compression": "bslz4"
}

The second part is the binary blob of the array with the above description.

compression can be:

  • "bslz4" (bitshuffle lz4 compression)
  • "none"

series_end message

{
    "htype": "series_end",
    "msg_number": 2
}

Example code

Here is some sample python code how to read the seconday zmq stream

import zmq
import json
import time
import numpy as np
from bitshuffle import decompress_lz4

hostname = 'localhost'
context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.connect('tcp://%s:9999' %hostname)
while True:
    parts = pull.recv_multipart(copy=False)
    header = json.loads(parts[0].bytes)
    print(header)
    if header['htype'] == 'image':
        if header['compression'] == 'bslz4':
            img = decompress_lz4(parts[1].buffer,
                                 header['shape'],
                                 np.dtype(header['type']))
        else:
            img = np.frombuffer(parts[1], dtype=header['type'])
            img = img.reshape(header['shape']

Documentation

Documentation can be found in Internals.md.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 3

  •  
  •  
  •