import numpy as np
from warnings import warn
import ctypes
from sashimi.utilities import SpeedyArrayBuffer
from sashimi.hardware.cameras.interface import (
AbstractCamera,
TriggerMode,
CameraException,
CameraWarning,
)
from sashimi.hardware.cameras.hamamatsu.sdk import (
DCAMAPI_INIT,
DCAMDEV_OPEN,
DCAMWAIT_OPEN,
DCAMCAP_TRANSFERKIND_FRAME,
DCAMBUF_ATTACHKIND_FRAME,
DCAMCAP_START_SEQUENCE,
DCAMWAIT_CAPEVENT_FRAMEREADY,
DCAMPROP_OPTION_NEXT,
DCAMPROP_TYPE_MODE,
DCAMPROP_TYPE_REAL,
DCAMPROP_TYPE_LONG,
DCAMPROP_OPTION_NEAREST,
DCAMERR_ERROR,
DCAMCAP_STATUS_BUSY,
DCAMERR_NOERROR,
DCAMPROP_ATTR_HASVALUETEXT,
DCAMPROP_ATTR,
DCAMPROP_TYPE_MASK,
DCAMBUF_ATTACH,
DCAM_DEFAULT_ARG,
DCAMWAIT_START,
DCAMWAIT_CAPEVENT_STOPPED,
DCAMCAP_TRANSFERINFO,
DCAMPROP_VALUETEXT,
)
from sashimi.config import read_config
conf = read_config()
[docs]class HamamatsuCamera(AbstractCamera):
def __init__(self, camera_id, max_sensor_resolution):
super().__init__(camera_id, max_sensor_resolution)
# This need to be specified at the beginning, not to change with the ROI that we set.
self.dcam = ctypes.windll.dcamapi
paraminit = DCAMAPI_INIT(0, 0, 0, 0, None, None)
paraminit.size = ctypes.sizeof(paraminit)
self.error_code = self.dcam.dcamapi_init(ctypes.byref(paraminit))
# Open the camera.
paramopen = DCAMDEV_OPEN(0, self.camera_id, None)
paramopen.size = ctypes.sizeof(paramopen)
self.check_status(
self.dcam.dcamdev_open(ctypes.byref(paramopen)), "dcamdev_open"
)
self.camera_handle = ctypes.c_void_p(paramopen.hdcam)
# Set up wait handle
paramwait = DCAMWAIT_OPEN(0, 0, None, self.camera_handle)
paramwait.size = ctypes.sizeof(paramwait)
self.check_status(
self.dcam.dcamwait_open(ctypes.byref(paramwait)), "dcamwait_open"
)
self.wait_handle = ctypes.c_void_p(paramwait.hwait)
self.properties = self.get_camera_properties()
self.exposure_time = conf["camera"]["default_exposure"]
self._roi = (0, 0) + self.max_sensor_resolution
self._trigger_mode = TriggerMode.EXTERNAL_TRIGGER
self._frame_bytes = 0
self.buffer_index = 0
self.last_frame_number = 0
self.max_backlog = 0
self.number_image_buffers = 0
self.hcam_data = []
self.hcam_ptr = False
self.old_frame_bytes = -1
self.number_frames = 0
@property
def binning(self):
return self.get_property_value("binning")
@binning.setter
def binning(self, n_bin):
self.set_property_value("binning", f"{n_bin}x{n_bin}")
@property
def exposure_time(self):
return self.get_property_value("exposure_time") * 1000
@exposure_time.setter
def exposure_time(self, exp_val):
self.set_property_value("exposure_time", 0.001 * exp_val)
@property
def frame_rate(self):
return 1 / self.exposure_time
@property
def roi(self):
return self._roi
@roi.setter
def roi(self, exp_val: tuple):
"""The ROI is set in "maximum resolution of the sensor units". Therefore, it should not change
with the binning.
"""
self._roi = [(i * self.binning // 4) * 4 for i in exp_val]
self.set_property_value("subarray_vpos", self._roi[0])
self.set_property_value("subarray_hpos", self._roi[1])
self.set_property_value("subarray_vsize", self._roi[2])
self.set_property_value("subarray_hsize", self._roi[3])
@property
def trigger_mode(self):
return self._trigger_mode
@trigger_mode.setter
def trigger_mode(self, exp_val: TriggerMode):
self._trigger_mode = exp_val
self.set_property_value("trigger_source", exp_val.value)
@property
def frame_bytes(self):
return self._frame_bytes
@property
def frame_shape(self):
# TODO these get_property value can get cleaned up in another property
return tuple(
self.get_property_value(v) for v in ["image_height", "image_width"]
)
[docs] def get_frames(self):
frames = []
# Return a list of the ids of all the new frames since the last check.
# Returns an empty list if the camera has already stopped and no frames
# are available. This will block waiting for at least one new frame.
capture_status = ctypes.c_int32(0)
self.check_status(
self.dcam.dcamcap_status(self.camera_handle, ctypes.byref(capture_status))
)
# Wait for a new frame if the camera is acquiring.
if capture_status.value == DCAMCAP_STATUS_BUSY:
param_start = DCAMWAIT_START(
0,
0,
DCAMWAIT_CAPEVENT_FRAMEREADY | DCAMWAIT_CAPEVENT_STOPPED,
100,
)
param_start.size = ctypes.sizeof(param_start)
self.check_status(
self.dcam.dcamwait_start(self.wait_handle, ctypes.byref(param_start)),
"dcamwait_start",
)
# Check how many new frames there are.
paramtransfer = DCAMCAP_TRANSFERINFO(0, DCAMCAP_TRANSFERKIND_FRAME, 0, 0)
paramtransfer.size = ctypes.sizeof(paramtransfer)
self.check_status(
self.dcam.dcamcap_transferinfo(
self.camera_handle, ctypes.byref(paramtransfer)
),
"dcamcap_transferinfo",
)
cur_buffer_index = paramtransfer.nNewestFrameIndex
cur_frame_number = paramtransfer.nFrameCount
# Check that we have not acquired more frames than we can store in our buffer.
# Keep track of the maximum backlog.
backlog = cur_frame_number - self.last_frame_number
if backlog > self.number_image_buffers:
warn(
"Camera buffer overrun detected. Some frames might have been lost",
CameraWarning,
)
if backlog > self.max_backlog:
self.max_backlog = backlog
self.last_frame_number = cur_frame_number
# Create a list of the new frames.
new_frames = []
if cur_buffer_index < self.buffer_index:
for i in range(self.buffer_index + 1, self.number_image_buffers):
new_frames.append(i)
for i in range(cur_buffer_index + 1):
new_frames.append(i)
else:
for i in range(self.buffer_index, cur_buffer_index):
new_frames.append(i + 1)
self.buffer_index = cur_buffer_index
for i_frame in new_frames:
frame_data = self.hcam_data[i_frame].get_data()
frames.append(np.reshape(frame_data, self.frame_shape).copy())
return frames
def get_property_value(self, property_name):
# Check if the property exists.
if not (property_name in self.properties):
raise CameraException(f"Unknown property name{property_name}")
prop_id = self.properties[property_name]
# Get the property value.
c_value = ctypes.c_double(0)
self.check_status(
self.dcam.dcamprop_getvalue(
self.camera_handle,
ctypes.c_int32(prop_id),
ctypes.byref(c_value),
),
"dcamprop_getvalue",
)
prop_attr = self.get_property_attribute(property_name)
# Convert type based on attribute type.
temp = prop_attr.attribute & DCAMPROP_TYPE_MASK
if temp == DCAMPROP_TYPE_MODE:
prop_value = int(c_value.value)
elif temp == DCAMPROP_TYPE_LONG:
prop_value = int(c_value.value)
elif temp == DCAMPROP_TYPE_REAL:
prop_value = c_value.value
else:
prop_value = False
return prop_value
def set_property_value(self, property_name, property_value, *args, **kwargs):
# Check if the property exists.
if not (property_name in self.properties):
raise CameraException(f"Unknown property name {property_name}")
# If the value is text, figure out what the
# corresponding numerical property value is.
if isinstance(property_value, str):
text_values = self.get_property_text(property_name)
if property_value in text_values:
property_value = float(text_values[property_value])
else:
raise CameraException(
f"Invalid property value {property_value} for {property_name}"
)
# Check that the property is within range.
[pv_min, pv_max] = self.get_property_range(property_name)
if property_value < pv_min:
warn(
f"Value {property_value} for {property_name} is less than minimum {pv_min}. Setting to minimum.",
CameraWarning,
)
property_value = pv_min
if property_value > pv_max:
warn(
f"Value {property_value} for {property_name} is greater than maximum {pv_max}. Setting to maximum.",
CameraWarning,
)
property_value = pv_max
# Set the property value, return what it was set too.
prop_id = self.properties[property_name]
p_value = ctypes.c_double(property_value)
self.check_status(
self.dcam.dcamprop_setgetvalue(
self.camera_handle,
ctypes.c_int32(prop_id),
ctypes.byref(p_value),
ctypes.c_int32(DCAM_DEFAULT_ARG),
),
"dcamprop_setgetvalue",
)
return p_value.value
[docs] def start_acquisition(self):
self.buffer_index = -1
self.last_frame_number = 0
"""
This sets the sub-array mode as appropriate based on the current ROI.
"""
# Check ROI properties.
roi_w = self.get_property_value("subarray_hsize")
roi_h = self.get_property_value("subarray_vsize")
# If the ROI is smaller than the entire frame turn on subarray mode:
if (roi_h == self.max_sensor_resolution[0]) and (
roi_w == self.max_sensor_resolution[1]
):
self.set_property_value("subarray_mode", "OFF")
else:
self.set_property_value("subarray_mode", "ON")
# Get size of frame
self._frame_bytes = self.get_property_value("image_framebytes")
if self.old_frame_bytes != self._frame_bytes:
# The larger of either 2000 frames or some weird calculation for number of buffers for 2 seconds of data
self.number_image_buffers = min(
int((2.0 * 1024 * 1024 * 1024) / self._frame_bytes), 2000
)
# Allocate new image buffers.
ptr_array = ctypes.c_void_p * self.number_image_buffers
self.hcam_ptr = ptr_array()
self.hcam_data = []
for i in range(self.number_image_buffers):
hc_data = SpeedyArrayBuffer(self._frame_bytes)
self.hcam_ptr[i] = hc_data.get_data_pr()
self.hcam_data.append(hc_data)
self.old_frame_bytes = self._frame_bytes
# Attach image buffers and start acquisition.
# We need to attach & release for each acquisition otherwise
# we will get an error if we try to change the ROI in any way
# between acquisitions.
paramattach = DCAMBUF_ATTACH(
0,
DCAMBUF_ATTACHKIND_FRAME,
self.hcam_ptr,
self.number_image_buffers,
)
paramattach.size = ctypes.sizeof(paramattach)
self.check_status(
self.dcam.dcambuf_attach(self.camera_handle, paramattach),
"dcam_attachbuffer",
)
self.check_status(
self.dcam.dcamcap_start(self.camera_handle, DCAMCAP_START_SEQUENCE),
"dcamcap_start",
)
[docs] def stop_acquisition(self):
self.check_status(self.dcam.dcamcap_stop(self.camera_handle), "dcamcap_stop")
# Free image buffers.
self.max_backlog = 0
if self.hcam_ptr:
self.check_status(
self.dcam.dcambuf_release(self.camera_handle, DCAMBUF_ATTACHKIND_FRAME),
"dcambuf_release",
)
[docs] def shutdown(self):
super().shutdown()
self.check_status(self.dcam.dcamwait_close(self.wait_handle), "dcamwait_close")
self.check_status(self.dcam.dcamdev_close(self.camera_handle), "dcamdev_close")
[docs] def get_property_attribute(self, property_name):
"""
Return the attribute structure of a particular property.
"""
p_attr = DCAMPROP_ATTR()
p_attr.cbSize = ctypes.sizeof(p_attr)
p_attr.iProp = self.properties[property_name]
ret = self.check_status(
self.dcam.dcamprop_getattr(self.camera_handle, ctypes.byref(p_attr)),
"dcamprop_getattr",
)
if ret == 0:
return False
else:
return p_attr
[docs] def get_property_range(self, property_name):
"""
Return the range for an attribute.
"""
prop_attr = self.get_property_attribute(property_name)
temp = prop_attr.attribute & DCAMPROP_TYPE_MASK
if temp == DCAMPROP_TYPE_REAL:
return [float(prop_attr.valuemin), float(prop_attr.valuemax)]
else:
return [int(prop_attr.valuemin), int(prop_attr.valuemax)]
[docs] def get_property_text(self, property_name):
"""
Return the text options of a property (if any).
"""
prop_attr = self.get_property_attribute(property_name)
if not (prop_attr.attribute & DCAMPROP_ATTR_HASVALUETEXT):
return {}
else:
# Create property text structure.
prop_id = self.properties[property_name]
v = ctypes.c_double(prop_attr.valuemin)
prop_text = DCAMPROP_VALUETEXT()
c_buf_len = 64
c_buf = ctypes.create_string_buffer(c_buf_len)
# prop_text.text = ctypes.c_char_p(ctypes.addressof(c_buf))
prop_text.cbSize = ctypes.c_int32(ctypes.sizeof(prop_text))
prop_text.iProp = ctypes.c_int32(prop_id)
prop_text.value = v
prop_text.text = ctypes.addressof(c_buf)
prop_text.textbytes = c_buf_len
# Collect text options.
done = False
text_options = {}
while not done:
# Get text of current value.
self.check_status(
self.dcam.dcamprop_getvaluetext(
self.camera_handle, ctypes.byref(prop_text)
),
"dcamprop_getvaluetext",
)
text_options[prop_text.text.decode("utf-8")] = int(v.value)
# Get next value.
ret = self.dcam.dcamprop_queryvalue(
self.camera_handle,
ctypes.c_int32(prop_id),
ctypes.byref(v),
ctypes.c_int32(DCAMPROP_OPTION_NEXT),
)
prop_text.value = v
if ret != 1:
done = True
return text_options
def check_status(self, fn_return, fn_name="unknown"):
if fn_return == DCAMERR_ERROR:
c_buf_len = 80
c_buf = ctypes.create_string_buffer(c_buf_len)
raise Exception("dcam error " + str(fn_name) + " " + str(c_buf.value))
return fn_return
[docs] @staticmethod
def convert_property_name(p_name):
"""
Regularizes a property name to lowercase names with
the spaces replaced by underscores.
"""
return p_name.lower().replace(" ", "_")
[docs] def get_camera_properties(self):
"""
Return the ids & names of all the properties that the camera supports. This
is used at initialization to populate the self.properties attribute.
"""
c_buf_len = 64
c_buf = ctypes.create_string_buffer(c_buf_len)
properties = {}
prop_id = ctypes.c_int32(0)
# Reset to the start.
ret = self.dcam.dcamprop_getnextid(
self.camera_handle,
ctypes.byref(prop_id),
ctypes.c_uint32(DCAMPROP_OPTION_NEAREST),
)
if (ret != 0) and (ret != DCAMERR_NOERROR):
self.check_status(ret, "dcamprop_getnextid")
# Get the first property.
ret = self.dcam.dcamprop_getnextid(
self.camera_handle,
ctypes.byref(prop_id),
ctypes.c_int32(DCAMPROP_OPTION_NEXT),
)
if (ret != 0) and (ret != DCAMERR_NOERROR):
self.check_status(ret, "dcamprop_getnextid")
self.check_status(
self.dcam.dcamprop_getname(
self.camera_handle, prop_id, c_buf, ctypes.c_int32(c_buf_len)
),
"dcamprop_getname",
)
# Get the rest of the properties.
last = -1
while prop_id.value != last:
last = prop_id.value
properties[
self.convert_property_name(c_buf.value.decode("utf-8"))
] = prop_id.value
ret = self.dcam.dcamprop_getnextid(
self.camera_handle,
ctypes.byref(prop_id),
ctypes.c_int32(DCAMPROP_OPTION_NEXT),
)
if (ret != 0) and (ret != DCAMERR_NOERROR):
self.check_status(ret, "dcamprop_getnextid")
self.check_status(
self.dcam.dcamprop_getname(
self.camera_handle,
prop_id,
c_buf,
ctypes.c_int32(c_buf_len),
),
"dcamprop_getname",
)
if properties == {"": 0}:
raise ConnectionError("The Hamamatsu camera seems to be off!")
return properties