926 lines
40 KiB
Python
926 lines
40 KiB
Python
|
|
||
|
import cv2 as cv
|
||
|
import numpy as np
|
||
|
|
||
|
import picamera
|
||
|
from picamera.array import PiRGBArray
|
||
|
from fractions import Fraction
|
||
|
|
||
|
import time
|
||
|
from datetime import datetime
|
||
|
import os
|
||
|
import sys
|
||
|
|
||
|
import numpy as np
|
||
|
import math as M
|
||
|
|
||
|
from multiprocessing import Process, shared_memory
|
||
|
|
||
|
|
||
|
#----------------------------------------------------------------------
|
||
|
# Define camera settings
|
||
|
SENSOR_MODE = 4 # corresponding sensor mode to resolution
|
||
|
OUTPUT_RESOLUTION = (192, 144) # (width, heigth)
|
||
|
image_width = OUTPUT_RESOLUTION[0]
|
||
|
image_heigth = OUTPUT_RESOLUTION[1]
|
||
|
number_of_colorchannels = 3 # r, g, b
|
||
|
size_of_frame=int(image_heigth*image_heigth*number_of_colorchannels)
|
||
|
frame_dimension = int(number_of_colorchannels)
|
||
|
|
||
|
AWB_MODE = 'off' # Auto white balance mode
|
||
|
AWB_GAINS = (1.395, 1.15) # White Balance Gains to have colours read correctly: (red, blue). Int, floar or fraction are valid.
|
||
|
BRIGHTNESS = 25 # sets the brightness setting of the camera. default is 50. [0-100]
|
||
|
#the brighter, the brighter the LEDs and the higher the RGB values and vice versa!
|
||
|
CONTRAST = 100 # sets the contrast setting of the camera. The default value is 0. [-100 ... 100]
|
||
|
|
||
|
SHUTTER_SPEED = 50 # [µs]
|
||
|
|
||
|
ISO = 320 # ISO value
|
||
|
EXPOSURE_MODE = 'off'
|
||
|
FRAMERATE = 25 # frames per second. 40 fps is max for sensor mode 4
|
||
|
|
||
|
SLEEP_TIME = 2 # Time for sleep-mode for the camera in seconds. My default: 2 s
|
||
|
|
||
|
# miscellaneous parameters
|
||
|
max_value_of_uint64 = int((2**64) - 1) # @30 fps: konservative calculated driving time: 1.95*1e10 years --> Integer overflow not relevant.
|
||
|
|
||
|
#----------------------------------------------------------------------
|
||
|
# Parameters
|
||
|
pixels_per_mm = 32/24.25 #[px/mm] for 120 mm camera height for resolution: 192x144
|
||
|
# pixels_per_mm = 71/24.25 #[px/mm] for 120 mm camera height for resolution: 416x320
|
||
|
# pixels_per_mm = 107/24.25 #[px/mm] for 120 mm camera height for resolution: 640x480
|
||
|
|
||
|
# Offset Camera Sensor in Scooty according to Scooty-KS
|
||
|
x_offset_camera_mm = 100 # [mm]
|
||
|
y_offset_camera_mm = 50 # [mm]
|
||
|
x_offset_camera_px = x_offset_camera_mm*pixels_per_mm # [px]
|
||
|
y_offset_camera_px = y_offset_camera_mm*pixels_per_mm # [px]
|
||
|
|
||
|
# image parameters
|
||
|
image_heigth = OUTPUT_RESOLUTION[1] # shape [0]
|
||
|
image_width = OUTPUT_RESOLUTION[0]# shape[1]
|
||
|
# calculate center of image
|
||
|
[x_0, y_0] = np.array([image_width/2, image_heigth/2], dtype=np.uint16)
|
||
|
|
||
|
|
||
|
threshold_color_detection = 60 # values under this will not be considered as active leds in each color channel
|
||
|
|
||
|
# Parameters for Blob/LED Detection
|
||
|
minDiameter_mm = 1.5 # [mm] minimum diameter of detected blob/LED
|
||
|
maxDiameter_mm = 8 # [mm] maximum diameter of detected blob/LED
|
||
|
|
||
|
# Define color numbers to identify the color channels in the matrix with all detected LEDs. No string, because numpy array should stay uint16
|
||
|
color_number_off = 0
|
||
|
color_number_red = 1
|
||
|
color_number_green = 2
|
||
|
color_number_blue = 3
|
||
|
color_number_yellow = 4
|
||
|
color_number_magenta = 5
|
||
|
color_number_cyan = 6
|
||
|
color_number_white = 7
|
||
|
|
||
|
show_opencv_window = True # show opencv window
|
||
|
draw_opencv = True # draw lane and so on
|
||
|
|
||
|
print_additional_info = False
|
||
|
|
||
|
# create shared Memorys for main-process
|
||
|
# shared memory for bools
|
||
|
shm_bools_pre=np.array([False, False, False, False, False, False, False, False, False, False], dtype=np.bool8) # create numpy array with bools stored in it
|
||
|
# [0]: newframe [1]: p_red_finished [2]: p_green_finished [3]: p_blue_finished
|
||
|
# [4]: p_red_started [5]: p_green_started [6]: p_blue_started
|
||
|
# [7]: p_red_start_trigger [8]: p_green_start_trigger [9]: p_blue_start_trigger
|
||
|
size_of_buffer = shm_bools_pre.nbytes
|
||
|
print(f"size of buffer: {size_of_buffer}") # size of buffer: 10
|
||
|
print(f"shm_bools dtype: {shm_bools_pre.dtype}") # dtype: bool
|
||
|
shm_bools_create = shared_memory.SharedMemory(name="shm_bools", create=True, size=shm_bools_pre.nbytes) # create a new shared memory block
|
||
|
shm_bools = np.ndarray(shm_bools_pre.shape, dtype=shm_bools_pre.dtype, buffer=shm_bools_create.buf) # create a NumPy array backed by shared memory
|
||
|
shm_bools[:] = shm_bools_pre[:] # Copy the original data into shared memory
|
||
|
|
||
|
# print(shm_bool)
|
||
|
# print(shm_bools.name)
|
||
|
|
||
|
# shared memory for framenumber
|
||
|
shm_framenumber_pre=np.array([0], dtype=np.uint64)
|
||
|
size_of_buffer = shm_framenumber_pre.nbytes
|
||
|
print(f"size of framenumber-buffer: {size_of_buffer}") #8
|
||
|
print(f"shm_framenumber dtype: {shm_framenumber_pre.dtype}") #uint64
|
||
|
shm_framenumber_create = shared_memory.SharedMemory(name="shm_framenumber", create=True, size=shm_framenumber_pre.nbytes) # create a new shared memory block
|
||
|
shm_framenumber = np.ndarray(shm_framenumber_pre.shape, dtype=shm_framenumber_pre.dtype, buffer=shm_framenumber_create.buf) # create a NumPy array backed by shared memory
|
||
|
shm_framenumber[:] = shm_framenumber_pre[:] # Copy the original data into shared memory
|
||
|
# print(shm_framenumber) # [0]
|
||
|
# print(shm_framenumber_create.name) # shm_framenumber
|
||
|
|
||
|
# shared memory for red, green, blue frame
|
||
|
int_black = 0 # integer for black color/ no color
|
||
|
shm_colorframes_pre = np.full(\
|
||
|
(image_heigth,image_width), \
|
||
|
int_black, dtype=np.uint8)
|
||
|
size_of_buffer = shm_colorframes_pre.nbytes
|
||
|
print(f"size of colorframe-buffer: {size_of_buffer}") #133 120
|
||
|
print(f"shm_colorframes_pre dtype: {shm_colorframes_pre.dtype}") #uint8
|
||
|
shm_redframe_create = shared_memory.SharedMemory(name="shm_redframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block
|
||
|
shm_greenframe_create = shared_memory.SharedMemory(name="shm_greenframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block
|
||
|
shm_blueframe_create = shared_memory.SharedMemory(name="shm_blueframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block
|
||
|
shm_redframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_redframe_create.buf) # create a NumPy array backed by shared memory
|
||
|
shm_greenframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_greenframe_create.buf) # create a NumPy array backed by shared memory
|
||
|
shm_blueframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_blueframe_create.buf) # create a NumPy array backed by shared memory
|
||
|
shm_redframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory
|
||
|
shm_greenframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory
|
||
|
shm_blueframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory
|
||
|
# shared memory bgr frame
|
||
|
int_black = 0 # integer for black color/ no color
|
||
|
shm_frame_pre = np.full(\
|
||
|
(image_heigth,image_width, number_of_colorchannels), \
|
||
|
int_black, dtype=np.uint8)
|
||
|
size_of_buffer = shm_frame_pre.nbytes
|
||
|
print(f"size of colorframe-buffer: {size_of_buffer}") #399 360 = 416 x 320 x 3
|
||
|
print(f"shm_colorframes_pre dtype: {shm_frame_pre.dtype}") #uint8
|
||
|
shm_frame_create = shared_memory.SharedMemory(name="shm_frame", create=True, size=shm_frame_pre.nbytes) # create a new shared memory block
|
||
|
shm_frame = np.ndarray(shm_frame_pre.shape, dtype=shm_frame_pre.dtype, buffer=shm_frame_create.buf) # create a NumPy array backed by shared memory
|
||
|
shm_frame[:] = shm_frame_pre[:] # Copy the original data into shared memory
|
||
|
|
||
|
|
||
|
# ----------------------------------------------------------------------------
|
||
|
# Define Functions
|
||
|
def points_trafo(detected_LEDs, alpha_rad, dx, dy):
|
||
|
"""Tranfsform points of LED to lane in KS-LED"""
|
||
|
detected_LEDs_trafo = detected_LEDs.copy() # copy, becuase else it is only a pointer
|
||
|
detected_LEDs_trafo = detected_LEDs_trafo.astype(np.int16) # avoid integer overflow
|
||
|
x_pnts = detected_LEDs_trafo[:,0]
|
||
|
y_pnts = detected_LEDs_trafo[:,1]
|
||
|
|
||
|
# Translation
|
||
|
x1 = x_pnts-dx-x_0
|
||
|
x_trafo=x1
|
||
|
y1 = y_pnts-dy-y_0
|
||
|
y_trafo = y1
|
||
|
|
||
|
# Rotation. Winkel Sensor im UZS, also negativ zu mathematischer definiton
|
||
|
x_trafo = np.cos(-alpha_rad)*x1-np.sin(-alpha_rad)*y1
|
||
|
detected_LEDs_trafo[:,0] = x_trafo
|
||
|
y_trafo = np.sin(-alpha_rad)*x1+np.cos(-alpha_rad)*y1
|
||
|
detected_LEDs_trafo[:,1] = y_trafo
|
||
|
|
||
|
#sort points along lane: x_2, y_2 -axis (KS_LED)
|
||
|
detected_LEDs_trafo = detected_LEDs_trafo[detected_LEDs_trafo[:, 0].argsort(kind='quicksort')]
|
||
|
return detected_LEDs_trafo
|
||
|
|
||
|
def construct_lane(detected_LEDs, img_bgr):
|
||
|
"""construct the lane"""
|
||
|
# This function is partially commented in german, because higher math is used
|
||
|
# clearer what is trying to be achieved
|
||
|
|
||
|
# get points
|
||
|
# xy_pnts = detected_LEDs[:,0:2]
|
||
|
# x_pnts = detected_LEDs[:,0]
|
||
|
# y_pnts = detected_LEDs[:,1]
|
||
|
|
||
|
# approach 2:
|
||
|
# fit line through centers of LEDs in KS_0
|
||
|
# DIST_L": the simplest and the fastest least-squares method: the simple euclidean distance
|
||
|
param = 0 # not used for DIST_L2
|
||
|
reps = 0.001 # Sufficient accuracy for the radius (distance between the coordinate origin and the line).
|
||
|
aeps = 0.001 # Sufficient accuracy for the angle.
|
||
|
[dx, dy, x_2, y_2] = cv.fitLine(detected_LEDs[:,0:2], cv.DIST_L2, param, reps, aeps)
|
||
|
# x2, y2: same as: mean_of_leds = np.mean([x_pnts, y_pnts], 1)
|
||
|
|
||
|
alpha_rad = np.arctan2(dy, dx) # calculate angle of line
|
||
|
alpha = np.arctan2(dy, dx)*180/np.pi # calculate angle of line
|
||
|
# print(f"Lane: dx: {dx}, dy:{dy}, x2:{x_2}, y2:{y_2}, alpha:{alpha}°")
|
||
|
if print_additional_info:
|
||
|
print(f"Lane: alpha:{alpha[0]}°")
|
||
|
|
||
|
# get smallest distance to point an line
|
||
|
# Berechnung nach: Repetitorium Höhere Mathematik, Wirth
|
||
|
# Gerade: x = a+ t*b
|
||
|
# Punkt : OP = p
|
||
|
# d = abs(b x (p-a))/(abs(b))
|
||
|
# info: np.array()[:,0] --> gets only array with 1 dimensions with desired values
|
||
|
p = np.array([x_0, y_0])
|
||
|
a = np.array([x_2, y_2])[:,0]
|
||
|
b = np.array([np.cos(alpha_rad), np.sin(alpha_rad)])[:,0] # Richtungsvektor
|
||
|
c = p-a
|
||
|
|
||
|
# Betrag von Vektor: np.linalg.norm(vec)
|
||
|
cross= np.cross(b, c)
|
||
|
d = np.linalg.norm(cross)/np.linalg.norm(b) # distance [px]
|
||
|
#print(f"d: {round(d,2)}")
|
||
|
|
||
|
# Fußpunkt (X_LED, Y_LED)
|
||
|
t_0_dot = np.dot(c, b)
|
||
|
t_0_norm = (np.linalg.norm(b)**2)
|
||
|
t_0 = t_0_dot/t_0_norm
|
||
|
[x_LED, y_LED] = (a+t_0*b)
|
||
|
if print_additional_info:
|
||
|
print(f"x_LED: {x_LED}, y_LED: {y_LED}")
|
||
|
|
||
|
# Abstand (dx, dy) Fußpunkt zu KS_0
|
||
|
dx_LED = x_LED - x_0
|
||
|
dx_LED_mm = dx_LED*(1/pixels_per_mm)
|
||
|
dy_LED = y_LED - y_0
|
||
|
dy_LED_mm = dy_LED*(1/pixels_per_mm)
|
||
|
if print_additional_info:
|
||
|
print(f"dx_LED:{dx_LED} [px] , dy_LED:{dy_LED} [px]")
|
||
|
print(f"dx_LED:{dx_LED_mm} [mm] , dy_LED:{dy_LED_mm} [mm]")
|
||
|
|
||
|
# Abstand (dx, dy) Fußpunkt von Bildmitte zu KS_Scooty
|
||
|
# Diese Werte zurückgeben
|
||
|
dx_LED_scooty = x_LED - x_0 + x_offset_camera_px
|
||
|
dx_LED_scooty_mm = dx_LED_scooty*(1/pixels_per_mm)
|
||
|
dy_LED_scooty = y_LED - y_0 + y_offset_camera_px
|
||
|
dy_LED_scooty_mm = dy_LED_scooty*(1/pixels_per_mm)
|
||
|
if print_additional_info:
|
||
|
print(f"dx_LED_scooty:{dx_LED_scooty} [px] , dy_LED_scooty:{dy_LED_scooty} [px]")
|
||
|
print(f"dx_LED_scooty:{dx_LED_scooty_mm} [mm] , dy_LED_scooty:{dy_LED_scooty_mm} [mm]")
|
||
|
|
||
|
# Punkte Trafo, um sortierte position der LEDs entlang Spur zu erhalten
|
||
|
# Bei normal detected kann bei vertikaler LED zb Fehler entstehen und dann muster: 211323233 -> daher mit dieser sortierten weitermachen
|
||
|
detected_LEDs_KS_LED = points_trafo(detected_LEDs, alpha_rad, dx_LED, dy_LED)
|
||
|
if print_additional_info:
|
||
|
print(f"Detected LEDs in KS_LED:(x2, y2):\n {detected_LEDs_KS_LED}")
|
||
|
|
||
|
#-----------------------------------
|
||
|
# draw useful lines and points
|
||
|
|
||
|
# draw lane line
|
||
|
if draw_opencv:
|
||
|
pt_0 = (a+b*np.array([-300, -300])).astype(np.int32)
|
||
|
pt_1 = (a+b*np.array([300, 300])).astype(np.int32)
|
||
|
#print(f"pt_0: {pt_0}, pt_1: {pt_1}")
|
||
|
cv.line(img_bgr, pt_0, pt_1, (255,255,255),1) # draw lane
|
||
|
|
||
|
# draw dx dy
|
||
|
cv.line(img_bgr, (int(x_0), int(y_0)), (int(x_LED), int(y_LED)), (0,0,255), 2) # shortest distance from KS_0 to KS_LED --> Lot
|
||
|
# cv.line(img_bgr, (int(x_0), int(y_0)), (int(x_LED), int(y_0)), (0,0,255), 2) # only dx
|
||
|
# cv.line(img_bgr, (int(x_LED), int(y_0)), (int(x_LED), int(y_LED)), (0,0,255), 2) # only dy
|
||
|
|
||
|
#draw additional points
|
||
|
cv.circle(img_bgr, (int(x_2), int(y_2)), 5,(255,128,255),-1) #pink. Center of points
|
||
|
#cv.putText(img_bgr, '(x2, y2)',(int(x_2)+5, int(y_2)-5), cv.FONT_HERSHEY_SIMPLEX, 2, (255,255,255), cv.LINE_AA)
|
||
|
cv.circle(img_bgr, (int(x_LED), int(y_LED)), 5,(170,255,0),-1) # lime green. Fußpunkt
|
||
|
|
||
|
if show_opencv_window:
|
||
|
cv.imshow("Lane", img_bgr)
|
||
|
|
||
|
return dx_LED_scooty_mm, dy_LED_scooty_mm, detected_LEDs_KS_LED
|
||
|
|
||
|
def convert_rgb_to_grayscale_average(image_bgr):
|
||
|
"""This function converts the RGB image into an grayscale image.
|
||
|
Algorithm: Average: Y = (R+G+B)/3"""
|
||
|
|
||
|
# convert dtype to prevent integer overflow while addition
|
||
|
image_bgr = image_bgr.astype(np.uint16, copy=False)
|
||
|
image_gray = (image_bgr[:,:,0]+image_bgr[:,:,1]+image_bgr[:,:,2])/3 # add values / do conversion
|
||
|
image_gray = image_gray.astype(np.uint8, copy=False) # convert back to uint8
|
||
|
|
||
|
return image_gray
|
||
|
|
||
|
def create_detector(params_for_blob_detection):
|
||
|
detector = cv.SimpleBlobDetector_create(params_for_blob_detection) # Set up the detector with specified parameters.
|
||
|
return detector
|
||
|
|
||
|
def define_parameters_for_blob_detection():
|
||
|
"""set parameters for simple blob detector"""
|
||
|
params = cv.SimpleBlobDetector_Params()
|
||
|
|
||
|
# Threshold for Convert the source image to binary images by applying thresholding
|
||
|
# with several thresholds from minThreshold (inclusive) to maxThreshold (exclusive)
|
||
|
# with distance thresholdStep between neighboring thresholds.
|
||
|
# Since the Grayscale image is dark if only one color channel is active,
|
||
|
# the Threshold values have to be set like this.
|
||
|
# particularly the thresholdStep-Value has to be low
|
||
|
params.minThreshold=20 # reminder: this value is set for grayscale image
|
||
|
params.maxThreshold=255
|
||
|
params.thresholdStep=1
|
||
|
|
||
|
params.filterByColor=False # do not filter blobs by color
|
||
|
|
||
|
# Filter blobs by Area
|
||
|
params.filterByArea=True
|
||
|
minDiameter_px = minDiameter_mm*pixels_per_mm # [px] minimum diameter of detected blob/LED
|
||
|
maxDiameter_px = maxDiameter_mm*pixels_per_mm # [px] maximum diameter of detected blob/LED
|
||
|
minArea_px2 = np.pi/4*minDiameter_px**2
|
||
|
maxArea_px2 = np.pi/4*maxDiameter_px**2
|
||
|
|
||
|
params.minArea = minArea_px2 # min Area of a blob in px^2
|
||
|
params.maxArea = maxArea_px2 # max Area of a blob in px^2.
|
||
|
# reasons for not filtering maxArea: motion blur + rolling shutter --> larger Area
|
||
|
|
||
|
# Filter by Inertia
|
||
|
params.filterByInertia=False
|
||
|
params.minInertiaRatio = 0.2 # [0-1]
|
||
|
|
||
|
# Filter by Convexity
|
||
|
params.filterByConvexity=False
|
||
|
params.minConvexity = 0.2 # [0-1]
|
||
|
|
||
|
# Filter by Circularity
|
||
|
params.filterByCircularity=False
|
||
|
params.minCircularity = 0.4 # [0-1]
|
||
|
|
||
|
# params.minDistBetweenBlobs = minDist_px # this has no effect
|
||
|
|
||
|
return params
|
||
|
|
||
|
|
||
|
def detect_LED_positions_in_grayscale(image_gray, image_bgr, detector):
|
||
|
start_processing = time.perf_counter()
|
||
|
keypoints = detector.detect(image_gray) # Detect blobs --> LEDs
|
||
|
end_processing = time.perf_counter()
|
||
|
time_processing = end_processing-start_processing
|
||
|
time_processing = time_processing*1000
|
||
|
time_processing = round(time_processing, 2)
|
||
|
print(f'processing time detector: {time_processing} ms')
|
||
|
|
||
|
number_of_detected_leds = len(keypoints)
|
||
|
|
||
|
if number_of_detected_leds != 0:
|
||
|
# print information of keypoints
|
||
|
print(f"detected LEDs: {number_of_detected_leds}")
|
||
|
|
||
|
#Pre-allocate matrix for numpy
|
||
|
number_of_rows = number_of_detected_leds
|
||
|
number_of_columns = 3
|
||
|
position_of_leds = np.zeros((number_of_rows, number_of_columns), dtype=np.uint16)
|
||
|
for i, k in enumerate(keypoints):
|
||
|
# x_pos = round(k.pt[0],0) # x position
|
||
|
# y_pos = round(k.pt[1],0) # y position
|
||
|
# print(f"x: {x_pos} y: {y_pos}")
|
||
|
diameter_px = round(k.size,2)
|
||
|
diameter_mm = round(diameter_px*1/pixels_per_mm,2)
|
||
|
#print(f"diameter [px]: {diameter_px} diameter [mm]: {diameter_mm}") # diameter
|
||
|
# area_px2 = round(np.pi/4*k.size**2,0) # area in px^2
|
||
|
# area_mm2 = round(area_px2*(1/pixels_per_mm)**2,0)
|
||
|
# print(f"area [px^2]: {area_px2} area [mm^2]: {area_mm2}")
|
||
|
# print('')
|
||
|
|
||
|
# calculate parameters to transfer to matrix
|
||
|
# x_pos = int(np.ceil(x_pos))
|
||
|
# y_pos = int(np.ceil(y_pos))
|
||
|
# Fill matrix
|
||
|
# position_of_leds[i,:] = [x_pos,y_pos, 0]
|
||
|
position_of_leds[i,0] = int(np.ceil(k.pt[0])) # x positon
|
||
|
position_of_leds[i,1] = int(np.ceil(k.pt[1])) # y position
|
||
|
|
||
|
|
||
|
if draw_opencv:
|
||
|
# draw the keypoints on the original image
|
||
|
# cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS ensures the size of the circle corresponds to the size of blob
|
||
|
blobs = cv.drawKeypoints(image=image_bgr, keypoints=keypoints, color=(255, 255, 255), \
|
||
|
outImage=np.array([]), flags= cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
|
||
|
|
||
|
if show_opencv_window:
|
||
|
# cv.imshow("grayscale", image_gray)
|
||
|
cv.imshow("Detected", blobs)
|
||
|
return position_of_leds
|
||
|
|
||
|
else:
|
||
|
print(f"No LEDs were detected")
|
||
|
return None
|
||
|
|
||
|
def detect_position_of_all_LEDs_grayscale(image_gray, image_bgr, detector):
|
||
|
position_of_LEDs = detect_LED_positions_in_grayscale(image_gray, image_bgr, detector)
|
||
|
|
||
|
if position_of_LEDs is not None:
|
||
|
return position_of_LEDs
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
def get_color_of_leds(matrix_of_LEDs, image_bgr):
|
||
|
# is image_r[y_pos, x_pos] = image_bgr[y_pos,x_pos, 2] ? --> yes. No need to split the color channels.
|
||
|
|
||
|
offset = 0 # half of length from rectangle which is going to be used to determine the color around the middle point of the blob/led
|
||
|
# offset = 0 --> only the value from the middle point of the blob/led
|
||
|
# offset=1 --> 9 values, offset=2-->25 values
|
||
|
|
||
|
for led in matrix_of_LEDs:
|
||
|
x_pos = led[0] # uint16
|
||
|
y_pos = led[1] # uint16
|
||
|
|
||
|
# get values of color channels in region around middle point of blob/led:
|
||
|
# +1 at stop index, because it is not inclusive
|
||
|
region_around_blue_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 0] # uint8
|
||
|
region_around_green_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 1] # uint8
|
||
|
region_around_red_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 2] # uint8
|
||
|
|
||
|
# average of the values
|
||
|
# convert dtype to prevent integer overflow while addition
|
||
|
region_around_red_led = region_around_red_led.astype(np.uint16, copy=False)
|
||
|
region_around_green_led = region_around_green_led.astype(np.uint16, copy=False)
|
||
|
region_around_blue_led = region_around_blue_led.astype(np.uint16, copy=False)
|
||
|
# sum all elements in matrix and divide with number of elements
|
||
|
number_of_elements= region_around_blue_led.size
|
||
|
value_of_red_led = region_around_red_led.sum()/number_of_elements # float64, if not integer result
|
||
|
value_of_green_led = region_around_green_led.sum()/number_of_elements # float64, if not integer result
|
||
|
value_of_blue_led = region_around_blue_led.sum()/number_of_elements # float64, if not integer result
|
||
|
|
||
|
# determine which leds are active:
|
||
|
# if value > threshold --> led is active
|
||
|
status_blue_led = False; status_green_led = False; status_red_led = False
|
||
|
if value_of_blue_led > threshold_color_detection:
|
||
|
status_blue_led = True
|
||
|
if value_of_green_led > threshold_color_detection:
|
||
|
status_green_led = True
|
||
|
if value_of_red_led > threshold_color_detection:
|
||
|
status_red_led = True
|
||
|
|
||
|
# determine color by checking the cases:
|
||
|
# case 1: red
|
||
|
if status_blue_led==False and status_green_led==False and status_red_led==True:
|
||
|
color = color_number_red
|
||
|
# case 2: green
|
||
|
elif status_blue_led==False and status_green_led==True and status_red_led==False:
|
||
|
color = color_number_green
|
||
|
# case 3: blue
|
||
|
elif status_blue_led==True and status_green_led==False and status_red_led==False:
|
||
|
color = color_number_blue
|
||
|
# case 4: yellow = red + green
|
||
|
elif status_blue_led==False and status_green_led==True and status_red_led==True:
|
||
|
color = color_number_yellow
|
||
|
# case 5: magenta = red + blue
|
||
|
elif status_blue_led==True and status_green_led==False and status_red_led==True:
|
||
|
color = color_number_magenta
|
||
|
# case 6: cyan = green + blue
|
||
|
elif status_blue_led==True and status_green_led==True and status_red_led==False:
|
||
|
color = color_number_cyan
|
||
|
# case 7: white = red + green + blue
|
||
|
elif status_blue_led==True and status_green_led==True and status_red_led==True:
|
||
|
color = color_number_white
|
||
|
# case 8: led not active
|
||
|
# this case can not occur, because no inactive led can be detected from the implemented blob-algorithm in detect_LED_positions_in_grayscale
|
||
|
else:
|
||
|
color = color_number_off
|
||
|
|
||
|
# fill matrix with color
|
||
|
led[2] = color # uint16
|
||
|
|
||
|
return matrix_of_LEDs
|
||
|
|
||
|
|
||
|
def detect_LEDs_with_grayscale(image_bgr, detector):
|
||
|
# convert rgb to grayscale image
|
||
|
# start_m1 = time.perf_counter()
|
||
|
image_gray = convert_rgb_to_grayscale_average(image_bgr)
|
||
|
# end_m1 = time.perf_counter()
|
||
|
# time_processing = end_m1-start_m1
|
||
|
# time_processing = time_processing*1000
|
||
|
# time_processing = round(time_processing, 2)
|
||
|
# print(f'processing time conversion: {time_processing} ms')
|
||
|
|
||
|
# get position of leds
|
||
|
position_of_LEDs = detect_position_of_all_LEDs_grayscale(image_gray=image_gray, image_bgr=image_bgr, detector=detector)
|
||
|
|
||
|
#position_of_LEDs = None
|
||
|
|
||
|
if position_of_LEDs is not None:
|
||
|
# determine color of leds and add to matrix
|
||
|
detected_LEDs = get_color_of_leds(position_of_LEDs, image_bgr)
|
||
|
return detected_LEDs
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
|
||
|
def lane_detection(image_bgr, detector):
|
||
|
# Detect LEDs
|
||
|
print(f"Detect LEDs and color:")
|
||
|
detected_LEDs = detect_LEDs_with_grayscale(image_bgr, detector)
|
||
|
|
||
|
if detected_LEDs is not None:
|
||
|
# Contruct lane
|
||
|
#print(f"_____________________________________")
|
||
|
# print("Contruct lane")
|
||
|
dx_LED_scooty_mm, dy_LED_scooty_mm, detected_LEDs_KS_LED = \
|
||
|
construct_lane(detected_LEDs, image_bgr)
|
||
|
|
||
|
# print result
|
||
|
if print_additional_info:
|
||
|
print(f"Detected LEDs relative to image-center(x0,y0):\n{detected_LEDs}")
|
||
|
return detected_LEDs
|
||
|
else:
|
||
|
return None
|
||
|
|
||
|
|
||
|
# -------------------
|
||
|
# Define Funcions
|
||
|
def get_frames_from_picamera():
|
||
|
# newframe= shm_bools[0] # do not use this! no updted values in "newframe"
|
||
|
# framenumber = shm_framenumber[0]
|
||
|
|
||
|
# Initialise Camera
|
||
|
with picamera.PiCamera() as camera:
|
||
|
with PiRGBArray(camera) as output:
|
||
|
# Set camera settings
|
||
|
camera.sensor_mode = SENSOR_MODE # force camera into desired sensor mode
|
||
|
camera.resolution = OUTPUT_RESOLUTION # frame will be resized from GPU to this resolution. No CPU usage!
|
||
|
camera.framerate = FRAMERATE
|
||
|
|
||
|
camera.awb_mode = AWB_MODE
|
||
|
camera.awb_gains = AWB_GAINS
|
||
|
|
||
|
camera.iso = ISO
|
||
|
camera.shutter_speed = SHUTTER_SPEED
|
||
|
|
||
|
time.sleep(SLEEP_TIME) # wait for iso gains and digital_gain and analog_gain to settle before fixing the gains with exposure_mode = off
|
||
|
camera.exposure_mode = EXPOSURE_MODE
|
||
|
|
||
|
time.sleep(1) # wait before applying brightness and contrast
|
||
|
camera.brightness = BRIGHTNESS
|
||
|
camera.contrast = CONTRAST
|
||
|
time.sleep(SLEEP_TIME) # Camera warm-up time to apply settings
|
||
|
|
||
|
t_start= time.perf_counter() # save time for fps calculation
|
||
|
|
||
|
for frameidx, frame in enumerate(camera.capture_continuous(output, format='bgr', use_video_port=True)):
|
||
|
# General information:
|
||
|
# - always the newest frame is recieved: processing must be faster than fps if every frame should be processed
|
||
|
|
||
|
shm_bools[0] = True
|
||
|
framenumber = frameidx+1 # frameidx starts with 0, framenumber with 1
|
||
|
|
||
|
|
||
|
shm_framenumber[0] = framenumber
|
||
|
#print('')
|
||
|
#print(f"new frame: {framenumber}")
|
||
|
|
||
|
#image = frame.array # raw NumPy array without JPEG encoding
|
||
|
|
||
|
#b,g,r = cv.split(frame.array) # split colour channels of raw NumPy array without JPEG encoding
|
||
|
shm_frame[:] = frame.array
|
||
|
#shm_redframe[:] = r
|
||
|
#shm_greenframe[:] = g
|
||
|
#shm_blueframe[:] = b
|
||
|
# for better performance one can assign directly in funtion line the values to the shm_memorys: shm_red, .. , ... = cv.split(..)
|
||
|
|
||
|
shm_bools[7:10]=[True] # trigger the start of the processing for each colorchannel
|
||
|
#print(shm_bools[7], shm_bools[8], shm_bools[9])
|
||
|
|
||
|
#display_image_with_text(image, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out) # show the frame
|
||
|
|
||
|
output.truncate(0) # clear the stream for next frame
|
||
|
|
||
|
if framenumber == 500: # 5 sek @ 30 fps, only for performance measuring
|
||
|
t_stop=time.perf_counter()
|
||
|
print(f"calculated fps: {framenumber/(t_stop-t_start)}")
|
||
|
break
|
||
|
|
||
|
|
||
|
def display_image_with_text(img, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out):
|
||
|
img = img.copy() # make copy of image and do not modify the original image
|
||
|
|
||
|
# please activate only one trigger once
|
||
|
trigger_show_brightness = 0 # trigger for (not) calculating andshowing the brightness of the image+
|
||
|
if trigger_show_brightness == 1:
|
||
|
arithmetic_mean_of_brightness_per_pixel_relative = calc_arithmetic_mean_of_brightness_per_pixel(img)
|
||
|
|
||
|
trigger_show_max_brightness_values_of_colour_channels = 0 # trigger for (not) calculating and showing max values of colour chanels
|
||
|
if trigger_show_max_brightness_values_of_colour_channels == 1:
|
||
|
r_max, g_max, b_max = get_max_rgb_values(img)
|
||
|
|
||
|
font = cv.FONT_HERSHEY_SIMPLEX # font
|
||
|
fontScale = 1 # fontScale
|
||
|
color = (255, 255, 255) # Font colour in BGR
|
||
|
thickness = 1 # Line thickness in px
|
||
|
|
||
|
# set text position
|
||
|
frame_width = int(img.shape[1])
|
||
|
frame_height = int(img.shape[0])
|
||
|
text_start_position_Y = int(round(frame_height*0.12)) # start position of text in pixels 12 % of frame height
|
||
|
text_linespacing = 50 # line spacing between two strings in pixels
|
||
|
# text_start_position_X = int(frame_width/4) # start text from 1/4 of image width
|
||
|
text_start_position_X = int(0) # start text from left edge of image
|
||
|
|
||
|
# set position in (x,y)-coordinated from top left corner. Bottom-left corner of the text string in the image.
|
||
|
pos_1 = (text_start_position_X, text_start_position_Y) # start text from 1/4 of image width
|
||
|
pos_2 = (text_start_position_X, text_start_position_Y+text_linespacing) # start text from 1/4 of image width
|
||
|
pos_3 = (text_start_position_X, text_start_position_Y+2*text_linespacing) # start text from 1/4 of image width
|
||
|
if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1:
|
||
|
pos_4 = (text_start_position_X, text_start_position_Y+3*text_linespacing) # start text from 1/4 of image width
|
||
|
|
||
|
|
||
|
# define text to display
|
||
|
text_line_1 = f"set ss: {shutter_speed} us"
|
||
|
text_line_3 = f"Frame: {framenumber}"
|
||
|
text_line_2 = f"ret exs: {camera_exposure_speed} us"
|
||
|
if trigger_show_brightness==1:
|
||
|
if arithmetic_mean_of_brightness_per_pixel_relative >= 0.01:
|
||
|
text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*100,2)} %"
|
||
|
elif arithmetic_mean_of_brightness_per_pixel_relative < 0.01:
|
||
|
text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*10e3,2)} pm"
|
||
|
if trigger_show_max_brightness_values_of_colour_channels==1:
|
||
|
text_line_4 = f"max: r:{r_max} g:{g_max} b:{b_max}"
|
||
|
|
||
|
|
||
|
# put the text into the image
|
||
|
#image_text_1 = cv.putText(img, text_line_1, pos_1, font,
|
||
|
# fontScale, color, thickness, cv.LINE_AA)
|
||
|
#image_text_2 = cv.putText(img, text_line_2, pos_2, font,
|
||
|
# fontScale, color, thickness, cv.LINE_AA)
|
||
|
#image_text_3 = cv.putText(img, text_line_3, pos_3, font,
|
||
|
# fontScale, color, thickness, cv.LINE_AA)
|
||
|
if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1:
|
||
|
image_text_4 = cv.putText(img, text_line_4, pos_4, font,
|
||
|
fontScale, color, thickness, cv.LINE_AA)
|
||
|
|
||
|
|
||
|
cv.imshow("Current Frame", img) # display the image
|
||
|
if trigger_record_OpenCV == 1:
|
||
|
out.write(img) # write frame to Video
|
||
|
|
||
|
|
||
|
def calc_arithmetic_mean_of_brightness_per_pixel(image):
|
||
|
"""Calculate overall brightness per pixel of the image. Mittelere Helligkeit pro pixel berechnen."""
|
||
|
#Comment: So rechenintensiv, dass man kein Blitzen sieht im Bild. (Oder sehr selten bzw. schwach). Daher anzeige von max-werten
|
||
|
|
||
|
b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr.
|
||
|
r=r.astype('uint16') # set dtype of one colour to uint16, because the sum of 255+255+255 >255 =765
|
||
|
#the rest will also be uint16 then
|
||
|
image_heigth = r.shape[0]
|
||
|
image_width = r.shape[1]
|
||
|
|
||
|
number_of_colour_channels = 3
|
||
|
arithmetic_mean_of_brightness_image = np.sum((r+g+b)/number_of_colour_channels)
|
||
|
arithmetic_mean_of_brightness_per_pixel = arithmetic_mean_of_brightness_image/(image_width*image_heigth)
|
||
|
|
||
|
max_possible_brightness = 255 # maximum possible brightness
|
||
|
arithmetic_mean_of_brightness_per_pixel_relative = arithmetic_mean_of_brightness_per_pixel/max_possible_brightness
|
||
|
|
||
|
return arithmetic_mean_of_brightness_per_pixel_relative
|
||
|
|
||
|
def get_max_rgb_values(image):
|
||
|
"""get max values of colour channels"""
|
||
|
b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr.
|
||
|
|
||
|
r_max=r.max()
|
||
|
g_max=g.max()
|
||
|
b_max=b.max()
|
||
|
|
||
|
return r_max, g_max, b_max
|
||
|
|
||
|
def create_folder_for_captures():
|
||
|
# Create folder for saving the captured pictures
|
||
|
now = datetime.now(); d1 = now.strftime("%Y-%m-%d %H-%M")
|
||
|
path_cwd = os.getcwd()
|
||
|
|
||
|
path_saveFolder = path_cwd+r"/Capture_"+d1
|
||
|
try:
|
||
|
os.mkdir(path_saveFolder)
|
||
|
folder_exists = True
|
||
|
except OSError:
|
||
|
print("Error! Ending script.")
|
||
|
quit()
|
||
|
|
||
|
return path_saveFolder, folder_exists
|
||
|
|
||
|
def do_processing():
|
||
|
time.sleep(0.001)
|
||
|
#print("ohh i am doing high complex image analysis with computer vision")
|
||
|
|
||
|
def do_processing_frame_r(frame):
|
||
|
print(f"max frame color red: {frame.max()}")
|
||
|
def do_processing_frame_g(frame):
|
||
|
print(f"max frame color green: {frame.max()}")
|
||
|
def do_processing_frame_b(frame):
|
||
|
print(f"max frame color blue: {frame.max()}")
|
||
|
|
||
|
|
||
|
|
||
|
def processing_red():
|
||
|
shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block
|
||
|
shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script
|
||
|
newframe = shm_bools[0]
|
||
|
p_red_finished = shm_bools[1] # not used, but for clarity
|
||
|
p_red_started = shm_bools[4]
|
||
|
|
||
|
shm_framenumber_init = shared_memory.SharedMemory\
|
||
|
(name="shm_framenumber") # Attach to existing shared memory block
|
||
|
shm_framenumber = np.ndarray((1,), dtype=np.uint64, \
|
||
|
buffer= shm_framenumber_init.buf)
|
||
|
# framenumer = shm_framenumber[0]
|
||
|
|
||
|
shm_redframe_init = shared_memory.SharedMemory\
|
||
|
(name="shm_redframe") # Attach to existing shared memory block
|
||
|
shm_redframe = np.ndarray((image_heigth,image_width), dtype=np.uint8, \
|
||
|
buffer= shm_redframe_init.buf)
|
||
|
|
||
|
|
||
|
i=0
|
||
|
while True:
|
||
|
try:
|
||
|
framenumber = shm_framenumber[0]
|
||
|
if i==0:
|
||
|
last_processed_frame = framenumber
|
||
|
|
||
|
conditions_for_first_start = (i==0) and\
|
||
|
(shm_bools[0] == True) and \
|
||
|
(shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \
|
||
|
and (shm_bools[7] == True)
|
||
|
|
||
|
conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True)
|
||
|
# newframe and all color-channel-processings have to be finished
|
||
|
|
||
|
if conditions_for_first_start == True:
|
||
|
shm_bools[4] = True # process started
|
||
|
shm_bools[7] = False # reset trigger
|
||
|
|
||
|
shm_bools[1] = False # set bool for p_red_finished to false
|
||
|
#t1 = time.perf_counter_ns()
|
||
|
do_processing()
|
||
|
i += 1
|
||
|
#print(f"first processing red finished. frame: {framenumber}")
|
||
|
shm_bools[1] = True # set bool for p_red_finished to true
|
||
|
|
||
|
shm_bools[4] = False # process ended
|
||
|
elif conditions_for_starting_processing == True:
|
||
|
shm_bools[4] = True # process started
|
||
|
shm_bools[7] = False # reset trigger
|
||
|
#print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}")
|
||
|
|
||
|
shm_bools[1] = False # set bool for p_red_finished to false
|
||
|
#t1 = time.perf_counter_ns()
|
||
|
do_processing()
|
||
|
#print(f"max frame color red: {shm_redframe.max()}")
|
||
|
if show_opencv_window:
|
||
|
cv.imshow("red", shm_redframe)
|
||
|
cv.waitKey(1)
|
||
|
#print(f"processing red finished. frame: {framenumber}")
|
||
|
shm_bools[1] = True # set bool for p_red_finished to true
|
||
|
#t2 = time.perf_counter_ns()
|
||
|
#print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms")
|
||
|
|
||
|
last_processed_frame = framenumber
|
||
|
shm_bools[4] = False # process ended
|
||
|
# elif shm_bools[0] == False:
|
||
|
# pass
|
||
|
#print(f"no new red frame")
|
||
|
|
||
|
# image processing finished
|
||
|
|
||
|
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
try:
|
||
|
shm_bool_init.close()
|
||
|
shm_framenumber_init.close()
|
||
|
shm_redframe_init.close()
|
||
|
|
||
|
except FileNotFoundError:
|
||
|
# Memory already destroyed
|
||
|
pass
|
||
|
|
||
|
def processing_bgr():
|
||
|
shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block
|
||
|
shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script
|
||
|
newframe = shm_bools[0]
|
||
|
p_red_finished = shm_bools[1] # not used, but for clarity
|
||
|
p_red_started = shm_bools[4]
|
||
|
|
||
|
shm_framenumber_init = shared_memory.SharedMemory\
|
||
|
(name="shm_framenumber") # Attach to existing shared memory block
|
||
|
shm_framenumber = np.ndarray((1,), dtype=np.uint64, \
|
||
|
buffer= shm_framenumber_init.buf)
|
||
|
# framenumer = shm_framenumber[0]
|
||
|
|
||
|
shm_frame_init = shared_memory.SharedMemory\
|
||
|
(name="shm_frame") # Attach to existing shared memory block
|
||
|
shm_redframe = np.ndarray((image_heigth,image_width, number_of_colorchannels), dtype=np.uint8, \
|
||
|
buffer= shm_frame_init.buf)
|
||
|
image_bgr = shm_redframe
|
||
|
|
||
|
|
||
|
params_for_blob_detection = define_parameters_for_blob_detection()
|
||
|
detector = create_detector(params_for_blob_detection)
|
||
|
|
||
|
i=0
|
||
|
while True:
|
||
|
try:
|
||
|
framenumber = shm_framenumber[0]
|
||
|
if i==0:
|
||
|
last_processed_frame = framenumber
|
||
|
|
||
|
conditions_for_first_start = (i==0) and\
|
||
|
(shm_bools[0] == True) and \
|
||
|
(shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \
|
||
|
and (shm_bools[7] == True)
|
||
|
|
||
|
conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True)
|
||
|
# newframe and all color-channel-processings have to be finished
|
||
|
|
||
|
if conditions_for_first_start == True:
|
||
|
print("first start")
|
||
|
shm_bools[4] = True # process started
|
||
|
shm_bools[7] = False # reset trigger
|
||
|
|
||
|
shm_bools[1] = False # set bool for p_red_finished to false
|
||
|
#t1 = time.perf_counter_ns()
|
||
|
lane_detection(image_bgr, detector) # hier immer noch altes bild -->> Processing ist langsamer als neue bilder kommen..
|
||
|
i += 1
|
||
|
#print(f"first processing red finished. frame: {framenumber}")
|
||
|
shm_bools[1] = True # set bool for p_red_finished to true
|
||
|
|
||
|
shm_bools[4] = False # process ended
|
||
|
elif conditions_for_starting_processing == True:
|
||
|
shm_bools[4] = True # process started
|
||
|
shm_bools[7] = False # reset trigger
|
||
|
#print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}")
|
||
|
|
||
|
shm_bools[1] = False # set bool for p_red_finished to false
|
||
|
#t1 = time.perf_counter_ns()
|
||
|
cv.imshow("AAA", shm_redframe)
|
||
|
lane_detection(image_bgr, detector)
|
||
|
#print(f"max frame color red: {shm_redframe.max()}")
|
||
|
if show_opencv_window:
|
||
|
cv.waitKey(1)
|
||
|
#print(f"processing red finished. frame: {framenumber}")
|
||
|
shm_bools[1] = True # set bool for p_red_finished to true
|
||
|
#t2 = time.perf_counter_ns()
|
||
|
#print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms")
|
||
|
|
||
|
last_processed_frame = framenumber
|
||
|
shm_bools[4] = False # process ended
|
||
|
# elif shm_bools[0] == False:
|
||
|
# pass
|
||
|
#print(f"no new red frame")
|
||
|
|
||
|
# image processing finished
|
||
|
|
||
|
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
try:
|
||
|
shm_bool_init.close()
|
||
|
shm_framenumber_init.close()
|
||
|
shm_redframe_init.close()
|
||
|
|
||
|
except FileNotFoundError:
|
||
|
# Memory already destroyed
|
||
|
pass
|
||
|
|
||
|
# ----------------------------------------------------------------------------
|
||
|
# main
|
||
|
def main():
|
||
|
start = time.perf_counter()
|
||
|
|
||
|
try:
|
||
|
# create processes
|
||
|
p_red = Process(target=processing_bgr)
|
||
|
processes = [p_red]
|
||
|
|
||
|
print(f"waiting 1 second to create processes")
|
||
|
time.sleep(1) # sind prozesse schon vorhanden
|
||
|
|
||
|
# start acitivity of processes
|
||
|
for process in processes:
|
||
|
process.start()
|
||
|
|
||
|
# start capturing
|
||
|
get_frames_from_picamera()
|
||
|
|
||
|
|
||
|
print('*******************************')
|
||
|
# this below code is only executed if the loop in take_image_picamera_opencv is breaked
|
||
|
# In real use case there will be no end of this program
|
||
|
|
||
|
for process in processes:
|
||
|
process.terminate()
|
||
|
|
||
|
# print time measuring
|
||
|
end = time.perf_counter()
|
||
|
print(f'Script finished in {round(end-start, 2)} s')
|
||
|
|
||
|
# close each SharedMemory instance and unlink to release the shared memory
|
||
|
shm_bools_create.close()
|
||
|
shm_bools_create.unlink()
|
||
|
shm_framenumber_create.close()
|
||
|
shm_framenumber_create.unlink()
|
||
|
shm_redframe_create.close()
|
||
|
shm_redframe_create.unlink()
|
||
|
shm_greenframe_create.close()
|
||
|
shm_greenframe_create.unlink()
|
||
|
shm_blueframe_create.close()
|
||
|
shm_blueframe_create.unlink()
|
||
|
|
||
|
except KeyboardInterrupt:
|
||
|
# Normally this prgoram never gets keyboard interrupted! But here this case is nevertheless handled
|
||
|
# End Script
|
||
|
try:
|
||
|
# close each SharedMemory instance and unlink to release the shared memory
|
||
|
shm_bools.close()
|
||
|
shm_bools.unlink()
|
||
|
shm_framenumber_create.close()
|
||
|
shm_framenumber_create.unlink()
|
||
|
shm_redframe_create.close()
|
||
|
shm_redframe_create.unlink()
|
||
|
shm_greenframe_create.close()
|
||
|
shm_greenframe_create.unlink()
|
||
|
shm_blueframe_create.close()
|
||
|
shm_blueframe_create.unlink()
|
||
|
|
||
|
except FileNotFoundError:
|
||
|
# Memory already destroyed
|
||
|
pass
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|