You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

753 lines
35 KiB

#!/usr/bin/python
# -*- coding:utf-8 -*-
# -*-mode:python ; tab-width:4 -*- ex:set tabstop=4 shiftwidth=4 expandtab: -*-
import numpy
from .gxwrapper import *
from .dxwrapper import *
from .gxidef import *
from .gxiapi import *
from .StatusProcessor import *
from .Interface import *
from .Device import *
from .ImageFormatConvert import *
from .ImageProcess import *
from .Exception import *
import types
if sys.version_info.major > 2:
INT_TYPE = int
else:
INT_TYPE = (int, long)
class DeviceManager(object):
__instance_num = 0
def set_log_type(self, log_type):
"""
:brief Set whether logs of the specified type can be sent
:param log_type: log type,See detail in GxLogTypeList
:return: status: State return value, See detail in GxStatusList
"""
if not isinstance(log_type, INT_TYPE):
raise ParameterTypeError("DeviceManager.set_log_type: "
"Expected log type is int, not %s" % type(log_type))
status= gx_set_log_type(log_type)
StatusProcessor.process(status, 'DeviceManager', 'set_log_type')
def get_log_type(self):
"""
:brief Gets whether logs of the specified type can be sent
:return: status: State return value, See detail in GxStatusList
log_type: log type,See detail in GxLogTypeList
"""
status,log_type = gx_get_log_type()
StatusProcessor.process(status, 'DeviceManager', 'get_log_type')
self.__log_type = log_type
return self.__log_type
def __new__(cls, *args, **kw):
cls.__instance_num += 1
status = gx_init_lib()
StatusProcessor.process(status, 'DeviceManager', 'init_lib')
return object.__new__(cls, *args)
def __init__(self):
self.__device_num = 0
self.__device_info_list = []
self.__interface_info_list = []
self.__interface_num = 0
def __del__(self):
self.__class__.__instance_num -= 1
if self.__class__.__instance_num <= 0:
status = gx_close_lib()
StatusProcessor.process(status, 'DeviceManager', 'close_lib')
def __create_device(self, device_class, device_handle):
status, interface_handle = gx_get_parent_interface_from_device(device_handle)
StatusProcessor.process(status, 'DeviceManager', '__create_device')
index = 0
for interface_item in self.__interface_info_list:
if interface_item['handle'] == interface_handle:
break
++index
if device_class == GxDeviceClassList.U3V:
return U3VDevice(device_handle, Interface(interface_handle, self.__interface_info_list[index]))
elif device_class == GxDeviceClassList.USB2:
return U2Device(device_handle, Interface(interface_handle, self.__interface_info_list[index]))
elif device_class == GxDeviceClassList.GEV:
return GEVDevice(device_handle, Interface(interface_handle, self.__interface_info_list[index]))
elif device_class == GxDeviceClassList.CXP:
return Device(device_handle, Interface(interface_handle, self.__interface_info_list[index]))
else:
raise NotFoundDevice("DeviceManager.__create_device: Does not support this device type.")
def __get_device_info_list(self, base_info, ip_info, num):
"""
:brief Convert GxDeviceBaseInfo and GxDeviceIPInfo to device info list
:param base_info: device base info list[GxDeviceBaseInfo]
:param ip_info: device ip info list[GxDeviceIPInfo]
:param num: device number
:return: device info list
"""
device_info_list = []
for i in range(num):
device_info_list.append({
'index': i + 1,
'vendor_name': string_decoding(base_info[i].vendor_name),
'model_name': string_decoding(base_info[i].model_name),
'sn': string_decoding(base_info[i].serial_number),
'display_name': string_decoding(base_info[i].display_name),
'device_id': string_decoding(base_info[i].device_id),
'user_id': string_decoding(base_info[i].user_id),
'access_status': base_info[i].access_status,
'device_class': base_info[i].device_class,
'mac': string_decoding(ip_info[i].mac),
'ip': string_decoding(ip_info[i].ip),
'subnet_mask': string_decoding(ip_info[i].subnet_mask),
'gateway': string_decoding(ip_info[i].gateway),
'nic_mac': string_decoding(ip_info[i].nic_mac),
'nic_ip': string_decoding(ip_info[i].nic_ip),
'nic_subnet_mask': string_decoding(ip_info[i].nic_subnet_mask),
'nic_gateWay': string_decoding(ip_info[i].nic_gateWay),
'nic_description': string_decoding(ip_info[i].nic_description)
})
return device_info_list
def __get_interface_info_list(self):
"""
:brief Get GXInterfaceInfo and Convert GXInterfaceInfo to interface info list
:return: interface info list
"""
status,interface_number = gx_get_interface_number()
StatusProcessor.process(status, 'DeviceManager', '__get_interface_info_list')
interface_info_list = []
for nindex in range(interface_number):
status, interface_info = gx_get_interface_info(nindex+1)
StatusProcessor.process(status, 'DeviceManager', '__get_interface_info_list')
status, interface_handle = gx_get_interface_handle(nindex+1)
StatusProcessor.process(status, 'DeviceManager', '__get_interface_info_list')
if GxTLClassList.TL_TYPE_CXP == interface_info.TLayer_type:
interface_info_list.append({
'handle' : interface_handle,
'type' : GxTLClassList.TL_TYPE_CXP,
'display_name' : string_decoding( interface_info.IF_info.CXP_interface_info.display_name),
'interface_id' : string_decoding( interface_info.IF_info.CXP_interface_info.interface_id),
'serial_number': string_decoding( interface_info.IF_info.CXP_interface_info.serial_number),
'description' : '',
'init_flag' : interface_info.IF_info.CXP_interface_info.init_flag,
'PCIE_info' : interface_info.IF_info.CXP_interface_info.PCIE_info,
'reserved' : array_decoding(interface_info.IF_info.CXP_interface_info.reserved),
})
elif GxTLClassList.TL_TYPE_GEV == interface_info.TLayer_type:
interface_info_list.append({
'handle' : interface_handle,
'type' : GxTLClassList.TL_TYPE_GEV,
'display_name' : string_decoding( interface_info.IF_info.GEV_interface_info.display_name),
'interface_id' : string_decoding(interface_info.IF_info.GEV_interface_info.interface_id),
'serial_number' : string_decoding(interface_info.IF_info.GEV_interface_info.serial_number),
'description' : string_decoding(interface_info.IF_info.GEV_interface_info.description),
'init_flag' : 0,
'PCIE_info' : 0,
'reserved' : array_decoding(interface_info.IF_info.GEV_interface_info.reserved),
})
elif GxTLClassList.TL_TYPE_U3V == interface_info.TLayer_type:
interface_info_list.append({
'handle' : interface_handle,
'type' : GxTLClassList.TL_TYPE_U3V,
'display_name' : string_decoding( interface_info.IF_info.U3V_interface_info.display_name),
'interface_id' : string_decoding(interface_info.IF_info.U3V_interface_info.interface_id),
'serial_number' : string_decoding(interface_info.IF_info.U3V_interface_info.serial_number),
'description' : string_decoding(interface_info.IF_info.U3V_interface_info.description),
'init_flag' : 0,
'PCIE_info' : 0,
'reserved' : array_decoding(interface_info.IF_info.U3V_interface_info.reserved),
})
elif GxTLClassList.TL_TYPE_USB == interface_info.TLayer_type:
interface_info_list.append({
'handle' : interface_handle,
'type' : GxTLClassList.TL_TYPE_USB,
'display_name' : string_decoding( interface_info.IF_info.USB_interface_info.display_name),
'interface_id' : string_decoding(interface_info.IF_info.USB_interface_info.interface_id),
'serial_number' : string_decoding(interface_info.IF_info.USB_interface_info.serial_number),
'description' : string_decoding(interface_info.IF_info.USB_interface_info.description),
'init_flag' : 0,
'PCIE_info' : 0,
'reserved' : array_decoding(interface_info.IF_info.USB_interface_info.reserved),
})
else:
interface_info_list.append({
'handle' : 0,
'type' : GxTLClassList.TL_TYPE_UNKNOWN,
'display_name' : string_decoding(''),
'interface_id' : '',
'serial_number' : '',
'description' : '',
'init_flag' : 0,
'PCIE_info' : 0,
'reserved' : [],
})
return interface_number,interface_info_list
def __get_ip_info(self, base_info_list, dev_mum):
"""
:brief Get the network information
"""
ip_info_list = []
for i in range(dev_mum):
if base_info_list[i].device_class == GxDeviceClassList.GEV:
status, ip_info = gx_get_device_ip_info(i + 1)
StatusProcessor.process(status, 'DeviceManager', '__get_ip_info')
ip_info_list.append(ip_info)
else:
ip_info_list.append(GxDeviceIPInfo())
return ip_info_list
def update_device_list(self, timeout=200):
"""
:brief enumerate the same network segment devices
:param timeout: Enumeration timeout, range:[0, 0xFFFFFFFF]
:return: dev_num: device number
device_info_list: all device info list
"""
if not isinstance(timeout, INT_TYPE):
raise ParameterTypeError("DeviceManager.update_device_list: "
"Expected timeout type is int, not %s" % type(timeout))
if (timeout < 0) or (timeout > UNSIGNED_INT_MAX):
print("DeviceManager.update_device_list: "
"timeout out of bounds, timeout: minimum=0, maximum=%s" % hex(UNSIGNED_INT_MAX).__str__())
return 0, None
status, dev_num = gx_update_device_list(timeout)
StatusProcessor.process(status, 'DeviceManager', 'update_device_list')
self.__interface_num, self.__interface_info_list = self.__get_interface_info_list()
status, base_info_list = gx_get_all_device_base_info(dev_num)
StatusProcessor.process(status, 'DeviceManager', 'update_device_list')
ip_info_list = self.__get_ip_info(base_info_list, dev_num)
self.__device_num = dev_num
self.__device_info_list = self.__get_device_info_list(base_info_list, ip_info_list, dev_num)
return self.__device_num, self.__device_info_list
def update_device_list_ex(self, tl_type, timeout=2000):
"""
:brief Enumerate the device_type type devices
:param tl_type:device type
:param timeout: Enumeration timeout, range:[0, 0xFFFFFFFF]
:return: dev_num: device number
device_info_list: all device info list
"""
if not isinstance(timeout, INT_TYPE):
raise ParameterTypeError("DeviceManager.update_device_list: "
"Expected timeout type is int, not %s" % type(timeout))
if (timeout < 0) or (timeout > UNSIGNED_INT_MAX):
print("DeviceManager.update_device_list: "
"timeout out of bounds, timeout: minimum=0, maximum=%s" % hex(UNSIGNED_INT_MAX).__str__())
return 0, None
status, dev_num = gx_update_device_list_ex(tl_type, timeout)
StatusProcessor.process(status, 'DeviceManager', 'update_device_list_ex')
self.__interface_num, self.__interface_info_list = self.__get_interface_info_list()
status, base_info_list = gx_get_all_device_base_info(dev_num)
StatusProcessor.process(status, 'DeviceManager', 'update_device_list_ex')
ip_info_list = self.__get_ip_info(base_info_list, dev_num)
self.__device_num = dev_num
self.__device_info_list = self.__get_device_info_list(base_info_list, ip_info_list, dev_num)
return self.__device_num, self.__device_info_list
def update_all_device_list(self, timeout=200):
"""
:brief Enumerate devices on different network segments
:param timeout: Enumeration timeout, range:[0, 0xFFFFFFFF]
:return: dev_num: device number
device_info_list: all device info list
"""
if not isinstance(timeout, INT_TYPE):
raise ParameterTypeError("DeviceManager.update_all_device_list: "
"Expected timeout type is int, not %s" % type(timeout))
if (timeout < 0) or (timeout > UNSIGNED_INT_MAX):
print("DeviceManager.update_all_device_list: "
"timeout out of bounds, timeout: minimum=0, maximum=%s" % hex(UNSIGNED_INT_MAX).__str__())
return 0, None
status, dev_num = gx_update_all_device_list(timeout)
StatusProcessor.process(status, 'DeviceManager', 'update_all_device_list')
self.__interface_num, self.__interface_info_list = self.__get_interface_info_list()
status, base_info_list = gx_get_all_device_base_info(dev_num)
StatusProcessor.process(status, 'DeviceManager', 'update_all_device_list')
ip_info_list = self.__get_ip_info(base_info_list, dev_num)
self.__device_num = dev_num
self.__device_info_list = self.__get_device_info_list(base_info_list, ip_info_list, dev_num)
return self.__device_num, self.__device_info_list
def get_interface_number(self):
"""
:brief Get device number
:return: device number
"""
return self.__interface_num
def get_interface_info(self):
"""
:brief Get device number
:return: device number
"""
return self.__interface_info_list
def get_interface(self, index):
"""
:brief Get device number
:return: device number
"""
if not isinstance(index, INT_TYPE):
raise ParameterTypeError("DeviceManager.get_interface: "
"Expected index type is int, not %s" % type(index))
if index < 1:
print("DeviceManager.get_interface: index must start from 1")
return None
elif index > UNSIGNED_INT_MAX:
print("DeviceManager.get_interface: index maximum: %s" % hex(UNSIGNED_INT_MAX).__str__())
return None
if self.__interface_num < index:
# Re-update the device
self.update_device_list()
if self.__interface_num < index:
raise NotFoundDevice("DeviceManager.get_interface: invalid index")
status, interface_handle = gx_get_interface_handle(index)
StatusProcessor.process(status, 'DeviceManager', 'get_interface')
return Interface(interface_handle, self.__interface_info_list[ index - 1])
def get_device_number(self):
"""
:brief Get device number
:return: device number
"""
return self.__device_num
def get_device_info(self):
"""
:brief Get all device info
:return: info_dict: device info list
"""
return self.__device_info_list
def open_device_by_index(self, index, access_mode=GxAccessMode.CONTROL):
"""
:brief open device by index
USB3 device return U3VDevice object
USB2 device return U2Device object
GEV device return GEVDevice object
:param index: device index must start from 1
:param access_mode: the access of open device
:return: Device object
"""
if not isinstance(index, INT_TYPE):
raise ParameterTypeError("DeviceManager.open_device_by_index: "
"Expected index type is int, not %s" % type(index))
if not isinstance(access_mode, INT_TYPE):
raise ParameterTypeError("DeviceManager.open_device_by_index: "
"Expected access_mode type is int, not %s" % type(access_mode))
if index < 1:
print("DeviceManager.open_device_by_index: index must start from 1")
return None
elif index > UNSIGNED_INT_MAX:
print("DeviceManager.open_device_by_index: index maximum: %s" % hex(UNSIGNED_INT_MAX).__str__())
return None
access_mode_dict = dict(
(name, getattr(GxAccessMode, name)) for name in dir(GxAccessMode) if not name.startswith('__'))
if access_mode not in access_mode_dict.values():
print("DeviceManager.open_device_by_index: "
"access_mode out of bounds, %s" % access_mode_dict.__str__())
return None
if self.__device_num < index:
# Re-update the device
self.update_device_list()
if self.__device_num < index:
raise NotFoundDevice("DeviceManager.open_device_by_index: invalid index")
# open devices by index
open_param = GxOpenParam()
open_param.content = string_encoding(str(index))
open_param.open_mode = GxOpenMode.INDEX
open_param.access_mode = access_mode
status, handle = gx_open_device(open_param)
StatusProcessor.process(status, 'DeviceManager', 'open_device_by_index')
# get device class
device_class = self.__device_info_list[index - 1]["device_class"]
return self.__create_device(device_class, handle)
def __get_device_class_by_sn(self, sn):
"""
:brief: 1.find device by sn in self.__device_info_list
2.return different objects according to device class
:param sn: device serial number
:return: device class
"""
for index in range(self.__device_num):
if self.__device_info_list[index]["sn"] == sn:
return self.__device_info_list[index]["device_class"]
# don't find this id in device base info list
return -1
def open_device_by_sn(self, sn, access_mode=GxAccessMode.CONTROL):
"""
:brief open device by serial number(SN)
USB3 device return U3VDevice object
USB2 device return U2Device object
GEV device return GEVDevice object
:param sn: device serial number, type: str
:param access_mode: the mode of open device[GxAccessMode]
:return: Device object
"""
if not isinstance(sn, str):
raise ParameterTypeError("DeviceManager.open_device_by_sn: "
"Expected sn type is str, not %s" % type(sn))
if not isinstance(access_mode, INT_TYPE):
raise ParameterTypeError("DeviceManager.open_device_by_sn: "
"Expected access_mode type is int, not %s" % type(access_mode))
access_mode_dict = dict(
(name, getattr(GxAccessMode, name)) for name in dir(GxAccessMode) if not name.startswith('__'))
if access_mode not in access_mode_dict.values():
print("DeviceManager.open_device_by_sn: "
"access_mode out of bounds, %s" % access_mode_dict.__str__())
return None
# get device class from self.__device_info_list
device_class = self.__get_device_class_by_sn(sn)
if device_class == -1:
# Re-update the device
self.update_device_list()
device_class = self.__get_device_class_by_sn(sn)
if device_class == -1:
# don't find this sn
raise NotFoundDevice("DeviceManager.open_device_by_sn: Not found device")
# open devices by sn
open_param = GxOpenParam()
open_param.content = string_encoding(sn)
open_param.open_mode = GxOpenMode.SN
open_param.access_mode = access_mode
status, handle = gx_open_device(open_param)
StatusProcessor.process(status, 'DeviceManager', 'open_device_by_sn')
return self.__create_device(device_class, handle)
def __get_device_class_by_user_id(self, user_id):
"""
:brief: 1.find device according to sn in self.__device_info_list
2.return different objects according to device class
:param user_id: user ID
:return: device class
"""
for index in range(self.__device_num):
if self.__device_info_list[index]["user_id"] == user_id:
return self.__device_info_list[index]["device_class"]
# don't find this id in device base info list
return -1
def open_device_by_user_id(self, user_id, access_mode=GxAccessMode.CONTROL):
"""
:brief open device by user defined name
USB3 device return U3VDevice object
GEV device return GEVDevice object
:param user_id: user defined name, type:str
:param access_mode: the mode of open device[GxAccessMode]
:return: Device object
"""
if not isinstance(user_id, str):
raise ParameterTypeError("DeviceManager.open_device_by_user_id: "
"Expected user_id type is str, not %s" % type(user_id))
elif user_id.__len__() == 0:
raise InvalidParameter("DeviceManager.open_device_by_user_id: Don't support user_id's length is 0")
if not isinstance(access_mode, INT_TYPE):
raise ParameterTypeError("DeviceManager.open_device_by_user_id: "
"Expected access_mode type is int, not %s" % type(access_mode))
access_mode_dict = dict(
(name, getattr(GxAccessMode, name)) for name in dir(GxAccessMode) if not name.startswith('__'))
if access_mode not in access_mode_dict.values():
print("DeviceManager.open_device_by_user_id: access_mode out of bounds, %s" % access_mode_dict.__str__())
return None
# get device class from self.__device_info_list
device_class = self.__get_device_class_by_user_id(user_id)
if device_class == -1:
# Re-update the device
self.update_device_list()
device_class = self.__get_device_class_by_user_id(user_id)
if device_class == -1:
# don't find this user_id
raise NotFoundDevice("DeviceManager.open_device_by_user_id: Not found device")
# open device by user_id
open_param = GxOpenParam()
open_param.content = string_encoding(user_id)
open_param.open_mode = GxOpenMode.USER_ID
open_param.access_mode = access_mode
status, handle = gx_open_device(open_param)
StatusProcessor.process(status, 'DeviceManager', 'open_device_by_user_id')
return self.__create_device(device_class, handle)
def open_device_by_ip(self, ip, access_mode=GxAccessMode.CONTROL):
"""
:brief open device by device ip address
:param ip: device ip address, type:str
:param access_mode: the mode of open device[GxAccessMode]
:return: GEVDevice object
"""
if not isinstance(ip, str):
raise ParameterTypeError("DeviceManager.open_device_by_ip: "
"Expected ip type is str, not %s" % type(ip))
if not isinstance(access_mode, INT_TYPE):
raise ParameterTypeError("DeviceManager.open_device_by_ip: "
"Expected access_mode type is int, not %s" % type(access_mode))
access_mode_dict = dict(
(name, getattr(GxAccessMode, name)) for name in dir(GxAccessMode) if not name.startswith('__'))
if access_mode not in access_mode_dict.values():
print("DeviceManager.open_device_by_ip: access_mode out of bounds, %s" % access_mode_dict.__str__())
return None
# open device by ip
open_param = GxOpenParam()
open_param.content = string_encoding(ip)
open_param.open_mode = GxOpenMode.IP
open_param.access_mode = access_mode
status, handle = gx_open_device(open_param)
StatusProcessor.process(status, 'DeviceManager', 'open_device_by_ip')
return self.__create_device(GxDeviceClassList.GEV, handle)
def open_device_by_mac(self, mac, access_mode=GxAccessMode.CONTROL):
"""
:brief open device by device mac address
:param mac: device mac address, type:str
:param access_mode: the mode of open device[GxAccessMode]
:return: GEVDevice object
"""
if not isinstance(mac, str):
raise ParameterTypeError("DeviceManager.open_device_by_mac: "
"Expected mac type is str, not %s" % type(mac))
if not isinstance(access_mode, INT_TYPE):
raise ParameterTypeError("DeviceManager.open_device_by_mac: "
"Expected access_mode type is int, not %s" % type(access_mode))
access_mode_dict = dict(
(name, getattr(GxAccessMode, name)) for name in dir(GxAccessMode) if not name.startswith('__'))
if access_mode not in access_mode_dict.values():
print("DeviceManager.open_device_by_mac: access_mode out of bounds, %s" % access_mode_dict.__str__())
return None
# open device by ip
open_param = GxOpenParam()
open_param.content = string_encoding(mac)
open_param.open_mode = GxOpenMode.MAC
open_param.access_mode = access_mode
status, handle = gx_open_device(open_param)
StatusProcessor.process(status, 'DeviceManager', 'open_device_by_mac')
return self.__create_device(GxDeviceClassList.GEV, handle)
def gige_reset_device(self, mac_address, reset_device_mode):
"""
:brief Reconnection/Reset
:param mac_address: The MAC address of the device(str)
:param reset_device_mode: Reconnection mode, refer to GxResetDeviceModeEntry
:return: None
"""
_InterUtility.check_type(mac_address, str, "mac_address", "DeviceManager", "gige_reset_device")
_InterUtility.check_type(reset_device_mode, int, "reset_device_mode", "DeviceManager", "gige_reset_device")
status = gx_gige_reset_device(mac_address, reset_device_mode)
StatusProcessor.process(status, 'DeviceManager', 'gige_reset_device')
def gige_force_ip(self, mac_address, ip_address, subnet_mask, default_gate_way):
"""
:brief Execute the Force IP
:param mac_address: The MAC address of the device(str)
:param ip_address: Reconnection mode, refer to GxResetDeviceModeEntry
:param subnet_mask: The MAC address of the device(str)
:param default_gate_way: Reconnection mode, refer to GxResetDeviceModeEntry
:return: None
"""
_InterUtility.check_type(mac_address, str, "mac_address", "DeviceManager", "gige_force_ip")
_InterUtility.check_type(ip_address, str, "ip_address", "DeviceManager", "gige_force_ip")
_InterUtility.check_type(subnet_mask, str, "subnet_mask", "DeviceManager", "gige_force_ip")
_InterUtility.check_type(default_gate_way, str, "default_gate_way", "DeviceManager", "gige_force_ip")
status = gx_gige_force_ip(mac_address, ip_address, subnet_mask, default_gate_way)
StatusProcessor.process(status, 'DeviceManager', 'gige_force_ip')
def gige_ip_configuration(self, mac_address, ipconfig_flag, ip_address, subnet_mask, default_gateway, user_id):
"""
:brief Execute the Force IP
:param mac_address: The MAC address of the device(str)
:param ipconfig_flag: Reconnection mode, refer to GxResetDeviceModeEntry
:param ip_address: The MAC address of the device(str)
:param subnet_mask: Reconnection mode, refer to GxResetDeviceModeEntry
:param default_gateway: The MAC address of the device(str)
:param user_id: Reconnection mode, refer to GxResetDeviceModeEntry
:return: None
"""
_InterUtility.check_type(mac_address, str, "mac_address", "DeviceManager", "gige_ip_configuration")
_InterUtility.check_type(ipconfig_flag, int, "ipconfig_flag", "DeviceManager", "gige_ip_configuration")
_InterUtility.check_type(ip_address, str, "ip_address", "DeviceManager", "gige_ip_configuration")
_InterUtility.check_type(subnet_mask, str, "subnet_mask", "DeviceManager", "gige_ip_configuration")
_InterUtility.check_type(default_gateway, str, "default_gateway", "DeviceManager", "gige_ip_configuration")
_InterUtility.check_type(user_id, str, "user_id", "DeviceManager", "gige_ip_configuration")
status = gx_gige_ip_configuration(mac_address, ipconfig_flag, ip_address, subnet_mask, default_gateway, user_id)
StatusProcessor.process(status, 'DeviceManager', 'gige_ip_configuration')
def create_image_format_convert(self):
"""
:brief create new convert pointer
:return: GxImageFormatConvert
"""
image_format_convert = ImageFormatConvert()
return image_format_convert
def create_image_process(self):
"""
:brief create image process
:return: GxImageFormatConvert
"""
image_process = ImageProcess()
return image_process
class _InterUtility:
def __init__(self):
pass
@staticmethod
def check_type(var, var_type, var_name="", class_name="", func_name=""):
"""
:chief check type
"""
if not isinstance(var, var_type):
if not isinstance(var_type, tuple):
raise ParameterTypeError("{} {}: Expected {} type is {}, not {}".format(class_name,
func_name, var_name,
var_type.__name__,
type(var).__name__))
else:
type_name = ""
for i, name in enumerate(var_type):
type_name = type_name + name.__name__
if i != len(var_type) - 1:
type_name = type_name + ", "
raise ParameterTypeError("{} {}: Expected {} type is ({}), not {}".format(class_name,
func_name, var_name,
type_name,
type(var).__name__))
@staticmethod
def get_pixel_color_filter(pixel_format):
"""
:brief Calculate pixel color filter based on pixel format
:param pixel_format
:return: pixel color filter
"""
gr_tup = (GxPixelFormatEntry.BAYER_GR8, GxPixelFormatEntry.BAYER_GR10,
GxPixelFormatEntry.BAYER_GR12, GxPixelFormatEntry.BAYER_GR16)
rg_tup = (GxPixelFormatEntry.BAYER_RG8, GxPixelFormatEntry.BAYER_RG10,
GxPixelFormatEntry.BAYER_RG12, GxPixelFormatEntry.BAYER_RG16)
gb_tup = (GxPixelFormatEntry.BAYER_GB8, GxPixelFormatEntry.BAYER_GB10,
GxPixelFormatEntry.BAYER_GB12, GxPixelFormatEntry.BAYER_GB16)
bg_tup = (GxPixelFormatEntry.BAYER_BG8, GxPixelFormatEntry.BAYER_BG10,
GxPixelFormatEntry.BAYER_BG12, GxPixelFormatEntry.BAYER_BG16)
mono_tup = (GxPixelFormatEntry.MONO8, GxPixelFormatEntry.MONO8_SIGNED,
GxPixelFormatEntry.MONO10, GxPixelFormatEntry.MONO12,
GxPixelFormatEntry.MONO14, GxPixelFormatEntry.MONO16)
if pixel_format in gr_tup:
return DxPixelColorFilter.GR
elif pixel_format in rg_tup:
return DxPixelColorFilter.RG
elif pixel_format in gb_tup:
return DxPixelColorFilter.GB
elif pixel_format in bg_tup:
return DxPixelColorFilter.BG
elif pixel_format in mono_tup:
return DxPixelColorFilter.NONE
else:
return -1
@staticmethod
def get_bit_depth(pixel_format):
"""
:brief Calculate pixel depth based on pixel format
:param pixel_format
:return: pixel depth
"""
bpp10_tup = (GxPixelFormatEntry.MONO10, GxPixelFormatEntry.BAYER_GR10, GxPixelFormatEntry.BAYER_RG10,
GxPixelFormatEntry.BAYER_GB10, GxPixelFormatEntry.BAYER_BG10)
bpp12_tup = (GxPixelFormatEntry.MONO12, GxPixelFormatEntry.BAYER_GR12, GxPixelFormatEntry.BAYER_RG12,
GxPixelFormatEntry.BAYER_GB12, GxPixelFormatEntry.BAYER_BG12)
bpp16_tup = (GxPixelFormatEntry.MONO16, GxPixelFormatEntry.BAYER_GR16, GxPixelFormatEntry.BAYER_RG16,
GxPixelFormatEntry.BAYER_GB16, GxPixelFormatEntry.BAYER_BG16)
if (pixel_format & PIXEL_BIT_MASK) == GX_PIXEL_8BIT:
return GxPixelSizeEntry.BPP8
elif pixel_format in bpp10_tup:
return GxPixelSizeEntry.BPP10
elif pixel_format in bpp12_tup:
return GxPixelSizeEntry.BPP12
elif pixel_format == GxPixelFormatEntry.MONO14:
return GxPixelSizeEntry.BPP14
elif pixel_format in bpp16_tup:
return GxPixelSizeEntry.BPP16
elif (pixel_format & PIXEL_BIT_MASK) == GX_PIXEL_24BIT:
return GxPixelSizeEntry.BPP24
elif (pixel_format & PIXEL_BIT_MASK) == GX_PIXEL_48BIT:
return GxPixelSizeEntry.BPP48
else:
return -1