You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
324 lines
11 KiB
324 lines
11 KiB
2 weeks ago
|
# version:1.1.2312.9221
|
||
|
from . import gxipy as gx
|
||
|
from PIL import Image
|
||
|
from ctypes import *
|
||
|
from .gxipy.gxidef import *
|
||
|
import numpy
|
||
|
from .gxipy.ImageFormatConvert import *
|
||
|
from datetime import datetime
|
||
|
from helper.logger import logger
|
||
|
import cv2
|
||
|
import cv2 as cv
|
||
|
import numpy as np
|
||
|
import math
|
||
|
import json
|
||
|
|
||
|
import pyzbar
|
||
|
|
||
|
from PIL import Image
|
||
|
from pyzbar.pyzbar import decode, ZBarSymbol
|
||
|
|
||
|
|
||
|
DX_RANGE = 5
|
||
|
DY_RANGE = 5
|
||
|
RADIUS_RANGE = 10
|
||
|
|
||
|
# MASK_RADIUS = 400
|
||
|
MASK_RADIUS = 500
|
||
|
|
||
|
QRCODE = None
|
||
|
|
||
|
def nothing(x):
|
||
|
# any operation
|
||
|
pass
|
||
|
|
||
|
def get_best_valid_bits(pixel_format):
|
||
|
valid_bits = DxValidBit.BIT0_7
|
||
|
if pixel_format in (GxPixelFormatEntry.MONO8, GxPixelFormatEntry.BAYER_GR8, GxPixelFormatEntry.BAYER_RG8, GxPixelFormatEntry.BAYER_GB8, GxPixelFormatEntry.BAYER_BG8
|
||
|
, GxPixelFormatEntry.RGB8, GxPixelFormatEntry.BGR8, GxPixelFormatEntry.R8, GxPixelFormatEntry.B8, GxPixelFormatEntry.G8):
|
||
|
valid_bits = DxValidBit.BIT0_7
|
||
|
elif pixel_format in (GxPixelFormatEntry.MONO10, GxPixelFormatEntry.MONO10_PACKED, GxPixelFormatEntry.BAYER_GR10,
|
||
|
GxPixelFormatEntry.BAYER_RG10, GxPixelFormatEntry.BAYER_GB10, GxPixelFormatEntry.BAYER_BG10):
|
||
|
valid_bits = DxValidBit.BIT2_9
|
||
|
elif pixel_format in (GxPixelFormatEntry.MONO12, GxPixelFormatEntry.MONO12_PACKED, GxPixelFormatEntry.BAYER_GR12,
|
||
|
GxPixelFormatEntry.BAYER_RG12, GxPixelFormatEntry.BAYER_GB12, GxPixelFormatEntry.BAYER_BG12):
|
||
|
valid_bits = DxValidBit.BIT4_11
|
||
|
elif pixel_format in (GxPixelFormatEntry.MONO14):
|
||
|
valid_bits = DxValidBit.BIT6_13
|
||
|
elif pixel_format in (GxPixelFormatEntry.MONO16):
|
||
|
valid_bits = DxValidBit.BIT8_15
|
||
|
return valid_bits
|
||
|
|
||
|
def convert_to_RGB(raw_image):
|
||
|
image_convert.set_dest_format(GxPixelFormatEntry.RGB8)
|
||
|
valid_bits = get_best_valid_bits(raw_image.get_pixel_format())
|
||
|
image_convert.set_valid_bits(valid_bits)
|
||
|
|
||
|
# create out put image buffer
|
||
|
buffer_out_size = image_convert.get_buffer_size_for_conversion(raw_image)
|
||
|
output_image_array = (c_ubyte * buffer_out_size)()
|
||
|
output_image = addressof(output_image_array)
|
||
|
|
||
|
#convert to rgb
|
||
|
image_convert.convert(raw_image, output_image, buffer_out_size, False)
|
||
|
if output_image is None:
|
||
|
print('Failed to convert RawImage to RGBImage')
|
||
|
return
|
||
|
|
||
|
return output_image_array, buffer_out_size
|
||
|
|
||
|
|
||
|
# def main():
|
||
|
|
||
|
def getParam():
|
||
|
|
||
|
global QRCODE
|
||
|
t1 = datetime.now()
|
||
|
|
||
|
# print the demo information
|
||
|
print("")
|
||
|
print("-------------------------------------------------------------")
|
||
|
print("Sample to show how to acquire color image continuously and show acquired image.")
|
||
|
print("-------------------------------------------------------------")
|
||
|
print("")
|
||
|
print("Initializing......")
|
||
|
print("")
|
||
|
|
||
|
# create a device manager
|
||
|
device_manager = gx.DeviceManager()
|
||
|
dev_num, dev_info_list = device_manager.update_all_device_list()
|
||
|
if dev_num == 0:
|
||
|
print("Number of enumerated devices is 0")
|
||
|
return
|
||
|
|
||
|
try:
|
||
|
# open the first device
|
||
|
cam = device_manager.open_device_by_index(1)
|
||
|
except Exception as e:
|
||
|
print(F"getParam() Exception: {e}")
|
||
|
if cam:
|
||
|
# stop data acquisition
|
||
|
cam.stream_off()
|
||
|
# close device
|
||
|
cam.close_device()
|
||
|
|
||
|
return None, None
|
||
|
|
||
|
try:
|
||
|
|
||
|
remote_device_feature = cam.get_remote_device_feature_control()
|
||
|
|
||
|
# get image convert obj
|
||
|
global image_convert
|
||
|
image_convert = device_manager.create_image_format_convert()
|
||
|
|
||
|
# get image improvement obj
|
||
|
global image_process, image_process_config
|
||
|
image_process = device_manager.create_image_process()
|
||
|
image_process_config = cam.create_image_process_config()
|
||
|
image_process_config.enable_color_correction(False)
|
||
|
|
||
|
# exit when the camera is a mono camera
|
||
|
pixel_format_value, pixel_format_str = remote_device_feature.get_enum_feature("PixelFormat").get()
|
||
|
if Utility.is_gray(pixel_format_value):
|
||
|
print("This sample does not support mono camera.")
|
||
|
cam.close_device()
|
||
|
return
|
||
|
|
||
|
# set continuous acquisition
|
||
|
trigger_mode_feature = remote_device_feature.get_enum_feature("TriggerMode")
|
||
|
trigger_mode_feature.set("Off")
|
||
|
|
||
|
# get param of improving image quality
|
||
|
if remote_device_feature.is_readable("GammaParam"):
|
||
|
gamma_value = remote_device_feature.get_float_feature("GammaParam").get()
|
||
|
image_process_config.set_gamma_param(gamma_value)
|
||
|
else:
|
||
|
image_process_config.set_gamma_param(1)
|
||
|
if remote_device_feature.is_readable("ContrastParam"):
|
||
|
contrast_value = remote_device_feature.get_int_feature("ContrastParam").get()
|
||
|
image_process_config.set_contrast_param(contrast_value)
|
||
|
else:
|
||
|
image_process_config.set_contrast_param(0)
|
||
|
|
||
|
# start data acquisition
|
||
|
cam.stream_on()
|
||
|
|
||
|
t2 = datetime.now()
|
||
|
|
||
|
itemList = []
|
||
|
fileBase = "camdata/{}".format(datetime.now().strftime("%Y_%m_%d_%H_%M_%S"))
|
||
|
# acquisition image: num is the image number
|
||
|
num = 25
|
||
|
for i in range(num):
|
||
|
t3 = datetime.now()
|
||
|
# while True:
|
||
|
|
||
|
realRadius = 10
|
||
|
blur = 5
|
||
|
|
||
|
param_1 = 120
|
||
|
param_2 = 28
|
||
|
|
||
|
min_radius = 50
|
||
|
max_radius = 0
|
||
|
|
||
|
# get raw image
|
||
|
raw_image = cam.data_stream[0].get_image()
|
||
|
if raw_image is None:
|
||
|
print("Getting image failed.")
|
||
|
continue
|
||
|
|
||
|
# get RGB image from raw image
|
||
|
image_buf = None
|
||
|
if raw_image.get_pixel_format() != GxPixelFormatEntry.RGB8:
|
||
|
rgb_image_array, rgb_image_buffer_length = convert_to_RGB(raw_image)
|
||
|
if rgb_image_array is None:
|
||
|
return
|
||
|
# create numpy array with data from rgb image
|
||
|
numpy_image = numpy.frombuffer(rgb_image_array, dtype=numpy.ubyte, count=rgb_image_buffer_length). \
|
||
|
reshape(raw_image.frame_data.height, raw_image.frame_data.width, 3)
|
||
|
image_buf = addressof(rgb_image_array)
|
||
|
else:
|
||
|
numpy_image = raw_image.get_numpy_array()
|
||
|
image_buf = raw_image.frame_data.image_buf
|
||
|
|
||
|
# 图像质量提升
|
||
|
rgb_image = GxImageInfo()
|
||
|
rgb_image.image_width = raw_image.frame_data.width
|
||
|
rgb_image.image_height = raw_image.frame_data.height
|
||
|
rgb_image.image_buf = image_buf
|
||
|
rgb_image.image_pixel_format = GxPixelFormatEntry.RGB8
|
||
|
|
||
|
# improve image quality
|
||
|
image_process.image_improvement(rgb_image, image_buf, image_process_config)
|
||
|
|
||
|
if numpy_image is None:
|
||
|
continue
|
||
|
|
||
|
src = cv.resize(numpy_image, (0,0), fx=0.25, fy=0.25)
|
||
|
|
||
|
fileNameImage = "qrcode_{}.jpg".format(fileBase)
|
||
|
cv2.imwrite(fileNameImage, src)
|
||
|
|
||
|
decoded_list = decode(src)
|
||
|
|
||
|
if len(decoded_list) > 0:
|
||
|
print("decoded_list: ", decoded_list)
|
||
|
urlB = decoded_list[0].data
|
||
|
QRCODE = urlB.decode()
|
||
|
|
||
|
h, w, _ = src.shape
|
||
|
|
||
|
# fileNameImage = "{}.original.jpg".format(fileBase)
|
||
|
# cv2.imwrite(fileNameImage, src)
|
||
|
|
||
|
# out.write(src)
|
||
|
|
||
|
gray = cv.cvtColor(src, cv.COLOR_BGR2GRAY)
|
||
|
|
||
|
gray = cv2.medianBlur(gray, blur)
|
||
|
# cv2.imshow("gray", gray)
|
||
|
|
||
|
rows = gray.shape[0]
|
||
|
|
||
|
circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, rows / 8,
|
||
|
param1=param_1, param2=param_2,
|
||
|
minRadius=min_radius, maxRadius=max_radius)
|
||
|
|
||
|
if circles is not None:
|
||
|
circles = np.uint16(np.around(circles))
|
||
|
for i in circles[0, :]:
|
||
|
center = (i[0], i[1])
|
||
|
# circle center
|
||
|
# # cv2.circle(src, center, 1, (0, 100, 100), 3)
|
||
|
# cv2.circle(src, center, 1, (60, 100, 100), 3)
|
||
|
# circle outline
|
||
|
radius = i[2]
|
||
|
# #cv2.circle(src, center, radius, (255, 0, 255), 3)
|
||
|
# cv2.circle(src, center, radius, (190, 90, 190), 3)
|
||
|
|
||
|
dx = center[0] - math.floor(w/2)
|
||
|
dy = center[1] - math.floor(h/2)
|
||
|
|
||
|
if len(itemList) > 0:
|
||
|
|
||
|
addNew = True
|
||
|
for item in itemList:
|
||
|
|
||
|
if abs(int(center[0]) - item[0]) < DX_RANGE \
|
||
|
and abs(int(center[1]) - item[1]) < DY_RANGE \
|
||
|
and abs(int(radius) - item[2]) < RADIUS_RANGE:
|
||
|
# print("Appen Existing, dx: {}, dy: {}, radius: {}".format(dx, dy, radius ))
|
||
|
item[5] = item[5] + 1
|
||
|
addNew = False
|
||
|
|
||
|
if addNew:
|
||
|
# print("Add New, dx: {}, dy: {}, radius: {}".format(dx, dy, radius ))
|
||
|
itemList.append([int(center[0]), int(center[1]), int(radius), int(dx), int(dy), 1])
|
||
|
else:
|
||
|
# print("Add New, dx: {}, dy: {}, radius: {}".format(dx, dy, radius ))
|
||
|
itemList.append([int(center[0]), int(center[1]), int(radius), int(dx), int(dy), 1])
|
||
|
|
||
|
ratio = realRadius / radius
|
||
|
ddx = math.floor(dx * ratio)
|
||
|
ddy = math.floor(dy * ratio)
|
||
|
dratio = math.floor(radius * ratio)
|
||
|
# print("Real: ddx: {}, ddy: {}, radius: {}".format(ddx, ddy, dratio))
|
||
|
|
||
|
|
||
|
cv2.line(src, (math.floor(w/2), 0), (math.floor(w/2), h), (255, 0,0), 1, cv2.LINE_AA)
|
||
|
cv2.line(src, (0, math.floor(h/2)), (w, math.floor(h/2)), (255, 0,0), 1, cv2.LINE_AA)
|
||
|
|
||
|
cv2.circle(src, (math.floor(w/2), math.floor(h/2)), 5, (255, 100, 100), 3)
|
||
|
|
||
|
# cv2.imshow("Source", src)
|
||
|
# # cv2.imshow("Detected Lines (in red) - Probabilistic Line Transform", cdstP)
|
||
|
|
||
|
t4 = datetime.now()
|
||
|
|
||
|
# key = cv2.waitKey(1)
|
||
|
# if key == 27:
|
||
|
# break
|
||
|
|
||
|
except Exception as e:
|
||
|
logger.info(F"getParam Exception: {e}")
|
||
|
|
||
|
# out.release()
|
||
|
# stop data acquisition
|
||
|
cam.stream_off()
|
||
|
# close device
|
||
|
cam.close_device()
|
||
|
|
||
|
t5 = datetime.now()
|
||
|
|
||
|
item = None
|
||
|
if len(itemList) > 0:
|
||
|
# print("itemList: ", itemList)
|
||
|
|
||
|
itemList.sort(key = lambda item: item[5], reverse=True )
|
||
|
item = itemList[0]
|
||
|
cv2.circle(src, (item[0], item[1]), 1, (60, 100, 100), 3)
|
||
|
cv2.circle(src, (item[0], item[1]), item[2], (190, 90, 190), 3)
|
||
|
|
||
|
print("Optimal dx: {}, dy: {}, radius: {}".format(item[3], item[4], item[2] ))
|
||
|
|
||
|
fileNameImage = "{}.jpg".format(fileBase)
|
||
|
cv2.imwrite(fileNameImage, src)
|
||
|
|
||
|
fileNameJson = "{}.json".format(fileBase)
|
||
|
|
||
|
with open(fileNameJson, "w") as fp:
|
||
|
json.dump(itemList, fp)
|
||
|
|
||
|
t6 = datetime.now()
|
||
|
|
||
|
# item = [x, y, radius, dx, dy, frequency]
|
||
|
if item and item[3] < 300 and item[4] < 300 and item[5] > 3:
|
||
|
return item, QRCODE
|
||
|
elif item:
|
||
|
print("Abnorm Value, {}, {}".format(item[3], item[4]))
|
||
|
return None, QRCODE
|
||
|
else:
|
||
|
return None, QRCODE
|
||
|
|