SA-Kenan/90_ZielSW/Multiprocessing/Lanedetection_Picamera_V01.py

913 lines
40 KiB
Python
Raw Permalink Normal View History

2022-05-09 20:30:26 +02:00
import cv2 as cv
import numpy as np
import picamera
from picamera.array import PiRGBArray
from fractions import Fraction
import time
from datetime import datetime
import os
import sys
import numpy as np
import math as M
from multiprocessing import Process, shared_memory
# Parameters
pixels_per_mm = 71/24.25 #[px/mm] for 120 mm camera height for resolution: 416x320
# pixels_per_mm = 107/24.25 #[px/mm] for 120 mm camera height for resolution: 640x480
# Offset Camera Sensor in Scooty according to Scooty-KS
x_offset_camera_mm = 100 # [mm]
y_offset_camera_mm = 50 # [mm]
x_offset_camera_px = x_offset_camera_mm*pixels_per_mm # [px]
y_offset_camera_px = y_offset_camera_mm*pixels_per_mm # [px]
# image parameters
image_heigth = 320 # shape [0]
image_width = 416 # shape[1]
# calculate center of image
[x_0, y_0] = np.array([image_width/2, image_heigth/2], dtype=np.uint16)
threshold_color_detection = 60 # values under this will not be considered as active leds in each color channel
# Parameters for Blob/LED Detection
minDiameter_mm = 5 # [mm] minimum diameter of detected blob/LED
maxDiameter_mm = 9 # [mm] maximum diameter of detected blob/LED
# Define color numbers to identify the color channels in the matrix with all detected LEDs. No string, because numpy array should stay uint16
color_number_off = 0
color_number_red = 1
color_number_green = 2
color_number_blue = 3
color_number_yellow = 4
color_number_magenta = 5
color_number_cyan = 6
color_number_white = 7
#----------------------------------------------------------------------
# Define camera settings
SENSOR_MODE = 4 # corresponding sensor mode to resolution
OUTPUT_RESOLUTION = (320, 416) # (width, heigth)
image_width = OUTPUT_RESOLUTION[0]
image_heigth = OUTPUT_RESOLUTION[1]
number_of_colorchannels = 3 # r, g, b
size_of_frame=int(image_heigth*image_heigth*number_of_colorchannels)
frame_dimension = int(number_of_colorchannels)
AWB_MODE = 'off' # Auto white balance mode
AWB_GAINS = (1.395, 1.15) # White Balance Gains to have colours read correctly: (red, blue). Int, floar or fraction are valid.
BRIGHTNESS = 25 # sets the brightness setting of the camera. default is 50. [0-100]
#the brighter, the brighter the LEDs and the higher the RGB values and vice versa!
CONTRAST = 100 # sets the contrast setting of the camera. The default value is 0. [-100 ... 100]
SHUTTER_SPEED = 50 # [µs]
ISO = 320 # ISO value
EXPOSURE_MODE = 'off'
FRAMERATE = 25 # frames per second. 40 fps is max for sensor mode 4
SLEEP_TIME = 2 # Time for sleep-mode for the camera in seconds. My default: 2 s
# miscellaneous parameters
max_value_of_uint64 = int((2**64) - 1) # @30 fps: konservative calculated driving time: 1.95*1e10 years --> Integer overflow not relevant.
# settings for development
show_opencv_window = False
# create shared Memorys for main-process
# shared memory for bools
shm_bools_pre=np.array([False, False, False, False, False, False, False, False, False, False], dtype=np.bool8) # create numpy array with bools stored in it
# [0]: newframe [1]: p_red_finished [2]: p_green_finished [3]: p_blue_finished
# [4]: p_red_started [5]: p_green_started [6]: p_blue_started
# [7]: p_red_start_trigger [8]: p_green_start_trigger [9]: p_blue_start_trigger
size_of_buffer = shm_bools_pre.nbytes
print(f"size of buffer: {size_of_buffer}") # size of buffer: 10
print(f"shm_bools dtype: {shm_bools_pre.dtype}") # dtype: bool
shm_bools_create = shared_memory.SharedMemory(name="shm_bools", create=True, size=shm_bools_pre.nbytes) # create a new shared memory block
shm_bools = np.ndarray(shm_bools_pre.shape, dtype=shm_bools_pre.dtype, buffer=shm_bools_create.buf) # create a NumPy array backed by shared memory
shm_bools[:] = shm_bools_pre[:] # Copy the original data into shared memory
# print(shm_bool)
# print(shm_bools.name)
# shared memory for framenumber
shm_framenumber_pre=np.array([0], dtype=np.uint64)
size_of_buffer = shm_framenumber_pre.nbytes
print(f"size of framenumber-buffer: {size_of_buffer}") #8
print(f"shm_framenumber dtype: {shm_framenumber_pre.dtype}") #uint64
shm_framenumber_create = shared_memory.SharedMemory(name="shm_framenumber", create=True, size=shm_framenumber_pre.nbytes) # create a new shared memory block
shm_framenumber = np.ndarray(shm_framenumber_pre.shape, dtype=shm_framenumber_pre.dtype, buffer=shm_framenumber_create.buf) # create a NumPy array backed by shared memory
shm_framenumber[:] = shm_framenumber_pre[:] # Copy the original data into shared memory
# print(shm_framenumber) # [0]
# print(shm_framenumber_create.name) # shm_framenumber
# shared memory for red, green, blue frame
int_black = 0 # integer for black color/ no color
shm_colorframes_pre = np.full(\
(image_heigth,image_width), \
int_black, dtype=np.uint8)
size_of_buffer = shm_colorframes_pre.nbytes
print(f"size of colorframe-buffer: {size_of_buffer}") #133 120
print(f"shm_colorframes_pre dtype: {shm_colorframes_pre.dtype}") #uint8
shm_redframe_create = shared_memory.SharedMemory(name="shm_redframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block
shm_greenframe_create = shared_memory.SharedMemory(name="shm_greenframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block
shm_blueframe_create = shared_memory.SharedMemory(name="shm_blueframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block
shm_redframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_redframe_create.buf) # create a NumPy array backed by shared memory
shm_greenframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_greenframe_create.buf) # create a NumPy array backed by shared memory
shm_blueframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_blueframe_create.buf) # create a NumPy array backed by shared memory
shm_redframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory
shm_greenframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory
shm_blueframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory
# shared memory bgr frame
int_black = 0 # integer for black color/ no color
shm_frame_pre = np.full(\
(image_heigth,image_width, number_of_colorchannels), \
int_black, dtype=np.uint8)
size_of_buffer = shm_frame_pre.nbytes
print(f"size of colorframe-buffer: {size_of_buffer}") #399 360 = 416 x 320 x 3
print(f"shm_colorframes_pre dtype: {shm_frame_pre.dtype}") #uint8
shm_frame_create = shared_memory.SharedMemory(name="shm_frame", create=True, size=shm_frame_pre.nbytes) # create a new shared memory block
shm_frame = np.ndarray(shm_frame_pre.shape, dtype=shm_frame_pre.dtype, buffer=shm_frame_create.buf) # create a NumPy array backed by shared memory
shm_frame[:] = shm_frame_pre[:] # Copy the original data into shared memory
# ----------------------------------------------------------------------------
# Define Functions
def points_trafo(detected_LEDs, alpha_rad, dx, dy):
"""Tranfsform points of LED to lane in KS-LED"""
detected_LEDs_trafo = detected_LEDs.copy() # copy, becuase else it is only a pointer
detected_LEDs_trafo = detected_LEDs_trafo.astype(np.int16) # avoid integer overflow
x_pnts = detected_LEDs_trafo[:,0]
y_pnts = detected_LEDs_trafo[:,1]
# Translation
x1 = x_pnts-dx-x_0
x_trafo=x1
y1 = y_pnts-dy-y_0
y_trafo = y1
# Rotation. Winkel Sensor im UZS, also negativ zu mathematischer definiton
x_trafo = np.cos(-alpha_rad)*x1-np.sin(-alpha_rad)*y1
detected_LEDs_trafo[:,0] = x_trafo
y_trafo = np.sin(-alpha_rad)*x1+np.cos(-alpha_rad)*y1
detected_LEDs_trafo[:,1] = y_trafo
#sort points along lane: x_2, y_2 -axis (KS_LED)
detected_LEDs_trafo = detected_LEDs_trafo[detected_LEDs_trafo[:, 0].argsort(kind='quicksort')]
return detected_LEDs_trafo
def construct_lane(detected_LEDs, img_bgr):
"""construct the lane"""
# This function is partially commented in german, because higher math is used
# clearer what is trying to be achieved
# get points
# xy_pnts = detected_LEDs[:,0:2]
# x_pnts = detected_LEDs[:,0]
# y_pnts = detected_LEDs[:,1]
# approach 2:
# fit line through centers of LEDs in KS_0
# DIST_L": the simplest and the fastest least-squares method: the simple euclidean distance
param = 0 # not used for DIST_L2
reps = 0.001 # Sufficient accuracy for the radius (distance between the coordinate origin and the line).
aeps = 0.001 # Sufficient accuracy for the angle.
[dx, dy, x_2, y_2] = cv.fitLine(detected_LEDs[:,0:2], cv.DIST_L2, param, reps, aeps)
# x2, y2: same as: mean_of_leds = np.mean([x_pnts, y_pnts], 1)
alpha_rad = np.arctan2(dy, dx) # calculate angle of line
alpha = np.arctan2(dy, dx)*180/np.pi # calculate angle of line
# print(f"Lane: dx: {dx}, dy:{dy}, x2:{x_2}, y2:{y_2}, alpha:{alpha}°")
print(f"Lane: alpha:{alpha[0]}°")
# get smallest distance to point an line
# Berechnung nach: Repetitorium Höhere Mathematik, Wirth
# Gerade: x = a+ t*b
# Punkt : OP = p
# d = abs(b x (p-a))/(abs(b))
# info: np.array()[:,0] --> gets only array with 1 dimensions with desired values
p = np.array([x_0, y_0])
a = np.array([x_2, y_2])[:,0]
b = np.array([np.cos(alpha_rad), np.sin(alpha_rad)])[:,0] # Richtungsvektor
c = p-a
# Betrag von Vektor: np.linalg.norm(vec)
cross= np.cross(b, c)
d = np.linalg.norm(cross)/np.linalg.norm(b) # distance [px]
#print(f"d: {round(d,2)}")
# Fußpunkt (X_LED, Y_LED)
t_0_dot = np.dot(c, b)
t_0_norm = (np.linalg.norm(b)**2)
t_0 = t_0_dot/t_0_norm
[x_LED, y_LED] = (a+t_0*b)
#print(f"x_LED: {x_LED}, y_LED: {y_LED}")
# Abstand (dx, dy) Fußpunkt zu KS_0
dx_LED = x_LED - x_0
dx_LED_mm = dx_LED*(1/pixels_per_mm)
dy_LED = y_LED - y_0
dy_LED_mm = dy_LED*(1/pixels_per_mm)
#print(f"dx_LED:{dx_LED} [px] , dy_LED:{dy_LED} [px]")
#print(f"dx_LED:{dx_LED_mm} [mm] , dy_LED:{dy_LED_mm} [mm]")
# Abstand (dx, dy) Fußpunkt von Bildmitte zu KS_Scooty
# Diese Werte zurückgeben
dx_LED_scooty = x_LED - x_0 + x_offset_camera_px
dx_LED_scooty_mm = dx_LED_scooty*(1/pixels_per_mm)
dy_LED_scooty = y_LED - y_0 + y_offset_camera_px
dy_LED_scooty_mm = dy_LED_scooty*(1/pixels_per_mm)
#print(f"dx_LED_scooty:{dx_LED_scooty} [px] , dy_LED_scooty:{dy_LED_scooty} [px]")
#print(f"dx_LED_scooty:{dx_LED_scooty_mm} [mm] , dy_LED_scooty:{dy_LED_scooty_mm} [mm]")
# Punkte Trafo, um sortierte position der LEDs entlang Spur zu erhalten
# Bei normal detected kann bei vertikaler LED zb Fehler entstehen und dann muster: 211323233 -> daher mit dieser sortierten weitermachen
detected_LEDs_KS_LED = points_trafo(detected_LEDs, alpha_rad, dx_LED, dy_LED)
#print(f"Detected LEDs in KS_LED:(x2, y2):\n {detected_LEDs_KS_LED}")
#-----------------------------------
# draw useful lines and points
# draw lane line
pt_0 = (a+b*np.array([-300, -300])).astype(np.int32)
pt_1 = (a+b*np.array([300, 300])).astype(np.int32)
#print(f"pt_0: {pt_0}, pt_1: {pt_1}")
cv.line(img_bgr, pt_0, pt_1, (255,255,255),1) # draw lane
# draw dx dy
cv.line(img_bgr, (int(x_0), int(y_0)), (int(x_LED), int(y_LED)), (0,0,255), 2) # shortest distance from KS_0 to KS_LED --> Lot
# cv.line(img_bgr, (int(x_0), int(y_0)), (int(x_LED), int(y_0)), (0,0,255), 2) # only dx
# cv.line(img_bgr, (int(x_LED), int(y_0)), (int(x_LED), int(y_LED)), (0,0,255), 2) # only dy
#draw additional points
cv.circle(img_bgr, (int(x_2), int(y_2)), 5,(255,128,255),-1) #pink. Center of points
#cv.putText(img_bgr, '(x2, y2)',(int(x_2)+5, int(y_2)-5), cv.FONT_HERSHEY_SIMPLEX, 2, (255,255,255), cv.LINE_AA)
cv.circle(img_bgr, (int(x_LED), int(y_LED)), 5,(170,255,0),-1) # lime green. Fußpunkt
#cv.imshow("Lane", img_bgr)
return dx_LED_scooty_mm, dy_LED_scooty_mm, detected_LEDs_KS_LED
def convert_rgb_to_grayscale_average(image_b, image_g, image_r):
"""This function converts the RGB image into an grayscale image.
Algorithm: Average: Y = (R+G+B)/3"""
# convert dtype to prevent integer overflow while addition
image_r = image_r.astype(np.uint16)
image_g = image_g.astype(np.uint16)
image_b = image_b.astype(np.uint16)
image_gray = (image_b+image_g+image_r)/3 # add values / do conversion
image_gray = image_gray.astype(np.uint8) # convert back to uint8
return image_gray
def define_parameters_for_blob_detection():
"""set parameters for simple blob detector"""
params = cv.SimpleBlobDetector_Params()
# Threshold for Convert the source image to binary images by applying thresholding
# with several thresholds from minThreshold (inclusive) to maxThreshold (exclusive)
# with distance thresholdStep between neighboring thresholds.
# Since the Grayscale image is dark if only one color channel is active,
# the Threshold values have to be set like this.
# particularly the thresholdStep-Value has to be low
params.minThreshold=20 # reminder: this value is set for grayscale image
params.maxThreshold=255
params.thresholdStep=1
params.filterByColor=False # do not filter blobs by color
# Filter blobs by Area
params.filterByArea=False
minDiameter_px = minDiameter_mm*pixels_per_mm # [px] minimum diameter of detected blob/LED
maxDiameter_px = maxDiameter_mm*pixels_per_mm # [px] maximum diameter of detected blob/LED
minArea_px2 = np.pi/4*minDiameter_px**2
maxArea_px2 = np.pi/4*maxDiameter_px**2
params.minArea = minArea_px2 # min Area of a blob in px^2
# params.maxArea = maxArea_px2 # max Area of a blob in px^2.
# reasons for not filtering maxArea: motion blur + rolling shutter --> larger Area
# Filter by Inertia
params.filterByInertia=False
params.minInertiaRatio = 0.2 # [0-1]
# Filter by Convexity
params.filterByConvexity=False
params.minConvexity = 0.2 # [0-1]
# Filter by Circularity
params.filterByCircularity=False
params.minCircularity = 0.4 # [0-1]
# params.minDistBetweenBlobs = minDist_px # this has no effect
return params
def detect_LED_positions_in_grayscale(image_gray, image_bgr, params_for_blob_detection):
detector = cv.SimpleBlobDetector_create(params_for_blob_detection) # Set up the detector with specified parameters.
keypoints = detector.detect(image_gray) # Detect blobs --> LEDs
number_of_detected_leds = len(keypoints)
if number_of_detected_leds != 0:
# print information of keypoints
print(f"detected LEDs: {number_of_detected_leds}")
#Pre-allocate matrix for numpy
number_of_rows = number_of_detected_leds
number_of_columns = 3
position_of_leds = np.zeros((number_of_rows, number_of_columns), dtype=np.uint16)
for i, k in enumerate(keypoints):
x_pos = round(k.pt[0],0) # x position
y_pos = round(k.pt[1],0) # y position
# print(f"x: {x_pos} y: {y_pos}")
diameter_px = round(k.size,2)
diameter_mm = round(diameter_px*1/pixels_per_mm,2)
# print(f"diameter [px]: {diameter_px} diameter [mm]: {diameter_mm}") # diameter
area_px2 = round(M.pi/4*k.size**2,0) # area in px^2
area_mm2 = round(area_px2*(1/pixels_per_mm)**2,0)
# print(f"area [px^2]: {area_px2} area [mm^2]: {area_mm2}")
# print('')
# calculate parameters to transfer to matrix
x_pos = int(np.ceil(x_pos))
y_pos = int(np.ceil(y_pos))
# Fill matrix
# position_of_leds[i,:] = [x_pos,y_pos, 0]
position_of_leds[i,0] = x_pos
position_of_leds[i,1] = y_pos
# draw the keypoints on the original image
# cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS ensures the size of the circle corresponds to the size of blob
blobs = cv.drawKeypoints(image=image_bgr, keypoints=keypoints, color=(255, 255, 255), \
outImage=np.array([]), flags= cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
# cv.imshow("grayscale", image_gray)
#cv.imshow("Detected", blobs)
return position_of_leds
else:
print(f"No LEDs were detected")
return None
def detect_position_of_all_LEDs_grayscale(image_gray, image_bgr, params_for_blob_detection):
position_of_LEDs = detect_LED_positions_in_grayscale(image_gray, image_bgr, params_for_blob_detection)
if position_of_LEDs is not None:
return position_of_LEDs
else:
return None
def get_color_of_leds(matrix_of_LEDs, image_bgr):
# is image_r[y_pos, x_pos] = image_bgr[y_pos,x_pos, 2] ? --> yes. No need to split the color channels.
offset = 2 # half of length from rectangle which is going to be used to determine the color around the middle point of the blob/led
# offset = 0 --> only the value from the middle point of the blob/led
# offset=1 --> 9 values, offset=2-->25 values
for led in matrix_of_LEDs:
x_pos = led[0] # uint16
y_pos = led[1] # uint16
# get values of color channels in region around middle point of blob/led:
# +1 at stop index, because it is not inclusive
region_around_blue_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 0] # uint8
region_around_green_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 1] # uint8
region_around_red_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 2] # uint8
# average of the values
# convert dtype to prevent integer overflow while addition
region_around_red_led = region_around_red_led.astype(np.uint16)
region_around_green_led = region_around_green_led.astype(np.uint16)
region_around_blue_led = region_around_blue_led .astype(np.uint16)
# sum all elements in matrix and divide with number of elements
number_of_elements= region_around_blue_led.size
value_of_red_led = region_around_red_led.sum()/number_of_elements # float64, if not integer result
value_of_green_led = region_around_green_led.sum()/number_of_elements # float64, if not integer result
value_of_blue_led = region_around_blue_led.sum()/number_of_elements # float64, if not integer result
# determine which leds are active:
# if value > threshold --> led is active
status_blue_led = False; status_green_led = False; status_red_led = False
if value_of_blue_led > threshold_color_detection:
status_blue_led = True
if value_of_green_led > threshold_color_detection:
status_green_led = True
if value_of_red_led > threshold_color_detection:
status_red_led = True
# determine color by checking the cases:
# case 1: red
if status_blue_led==False and status_green_led==False and status_red_led==True:
color = color_number_red
# case 2: green
elif status_blue_led==False and status_green_led==True and status_red_led==False:
color = color_number_green
# case 3: blue
elif status_blue_led==True and status_green_led==False and status_red_led==False:
color = color_number_blue
# case 4: yellow = red + green
elif status_blue_led==False and status_green_led==True and status_red_led==True:
color = color_number_yellow
# case 5: magenta = red + blue
elif status_blue_led==True and status_green_led==False and status_red_led==True:
color = color_number_magenta
# case 6: cyan = green + blue
elif status_blue_led==True and status_green_led==True and status_red_led==False:
color = color_number_cyan
# case 7: white = red + green + blue
elif status_blue_led==True and status_green_led==True and status_red_led==True:
color = color_number_white
# case 8: led not active
# this case can not occur, because no inactive led can be detected from the implemented blob-algorithm in detect_LED_positions_in_grayscale
else:
color = color_number_off
# fill matrix with color
led[2] = color # uint16
return matrix_of_LEDs
def detect_LEDs_with_grayscale(image_b, image_g, image_r, image_bgr, params_for_blob_detection):
# convert rgb to grayscale image
start_m1 = time.perf_counter_ns()
image_gray = convert_rgb_to_grayscale_average(image_b, image_g, image_r)
end_m1 = time.perf_counter_ns()
time_processing = end_m1-start_m1
time_processing = time_processing*1e-6
time_processing = round(time_processing, 2)
print(f'processing time convert image: {time_processing} ms')
# get position of leds
position_of_LEDs = detect_position_of_all_LEDs_grayscale(image_gray=image_gray, image_bgr=image_bgr, params_for_blob_detection=params_for_blob_detection)
if position_of_LEDs is not None:
# determine color of leds and add to matrix
detected_LEDs = get_color_of_leds(position_of_LEDs, image_bgr)
return detected_LEDs
else:
return None
def lane_detection(image_bgr, params_for_blob_detection):
start_m1 = time.perf_counter_ns()
image_b,image_g,image_r = cv.split(image_bgr) # Split colour channels and get grayscale images
end_m1 = time.perf_counter_ns()
time_processing = end_m1-start_m1
time_processing = time_processing*1e-6
time_processing = round(time_processing, 2)
print(f'processing time split image: {time_processing} ms')
# Detect LEDs
print(f"Detect LEDs and color:")
detected_LEDs = detect_LEDs_with_grayscale(image_b, image_g, image_r, image_bgr, params_for_blob_detection)
if detected_LEDs is not None:
# Contruct lane
#print(f"_____________________________________")
#print("Contruct lane")
dx_LED_scooty_mm, dy_LED_scooty_mm, detected_LEDs_KS_LED = \
construct_lane(detected_LEDs, image_bgr)
# print result
#print(f"Detected LEDs relative to image-center(x0,y0):\n{detected_LEDs}")
return detected_LEDs
else:
return None
#-------------------
# Define Funcions
def get_frames_from_picamera():
# newframe= shm_bools[0] # do not use this! no updted values in "newframe"
# framenumber = shm_framenumber[0]
# Initialise Camera
with picamera.PiCamera() as camera:
with PiRGBArray(camera) as output:
# Set camera settings
camera.sensor_mode = SENSOR_MODE # force camera into desired sensor mode
camera.resolution = OUTPUT_RESOLUTION # frame will be resized from GPU to this resolution. No CPU usage!
camera.framerate = FRAMERATE
camera.awb_mode = AWB_MODE
camera.awb_gains = AWB_GAINS
camera.iso = ISO
camera.shutter_speed = SHUTTER_SPEED
time.sleep(SLEEP_TIME) # wait for iso gains and digital_gain and analog_gain to settle before fixing the gains with exposure_mode = off
camera.exposure_mode = EXPOSURE_MODE
time.sleep(1) # wait before applying brightness and contrast
camera.brightness = BRIGHTNESS
camera.contrast = CONTRAST
time.sleep(SLEEP_TIME) # Camera warm-up time to apply settings
t_start= time.perf_counter() # save time for fps calculation
for frameidx, frame in enumerate(camera.capture_continuous(output, format='bgr', use_video_port=True)):
# General information:
# - always the newest frame is recieved: processing must be faster than fps if every frame should be processed
shm_bools[0] = True
framenumber = frameidx+1 # frameidx starts with 0, framenumber with 1
shm_framenumber[0] = framenumber
#print('')
#print(f"new frame: {framenumber}")
#image = frame.array # raw NumPy array without JPEG encoding
#b,g,r = cv.split(frame.array) # split colour channels of raw NumPy array without JPEG encoding
shm_frame[:] = frame.array
#shm_redframe[:] = r
#shm_greenframe[:] = g
#shm_blueframe[:] = b
# for better performance one can assign directly in funtion line the values to the shm_memorys: shm_red, .. , ... = cv.split(..)
shm_bools[7:10]=[True] # trigger the start of the processing for each colorchannel
#print(shm_bools[7], shm_bools[8], shm_bools[9])
#display_image_with_text(image, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out) # show the frame
output.truncate(0) # clear the stream for next frame
if framenumber == 500: # 5 sek @ 30 fps, only for performance measuring
t_stop=time.perf_counter()
print(f"calculated fps: {framenumber/(t_stop-t_start)}")
break
def display_image_with_text(img, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out):
img = img.copy() # make copy of image and do not modify the original image
# please activate only one trigger once
trigger_show_brightness = 0 # trigger for (not) calculating andshowing the brightness of the image+
if trigger_show_brightness == 1:
arithmetic_mean_of_brightness_per_pixel_relative = calc_arithmetic_mean_of_brightness_per_pixel(img)
trigger_show_max_brightness_values_of_colour_channels = 0 # trigger for (not) calculating and showing max values of colour chanels
if trigger_show_max_brightness_values_of_colour_channels == 1:
r_max, g_max, b_max = get_max_rgb_values(img)
font = cv.FONT_HERSHEY_SIMPLEX # font
fontScale = 1 # fontScale
color = (255, 255, 255) # Font colour in BGR
thickness = 1 # Line thickness in px
# set text position
frame_width = int(img.shape[1])
frame_height = int(img.shape[0])
text_start_position_Y = int(round(frame_height*0.12)) # start position of text in pixels 12 % of frame height
text_linespacing = 50 # line spacing between two strings in pixels
# text_start_position_X = int(frame_width/4) # start text from 1/4 of image width
text_start_position_X = int(0) # start text from left edge of image
# set position in (x,y)-coordinated from top left corner. Bottom-left corner of the text string in the image.
pos_1 = (text_start_position_X, text_start_position_Y) # start text from 1/4 of image width
pos_2 = (text_start_position_X, text_start_position_Y+text_linespacing) # start text from 1/4 of image width
pos_3 = (text_start_position_X, text_start_position_Y+2*text_linespacing) # start text from 1/4 of image width
if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1:
pos_4 = (text_start_position_X, text_start_position_Y+3*text_linespacing) # start text from 1/4 of image width
# define text to display
text_line_1 = f"set ss: {shutter_speed} us"
text_line_3 = f"Frame: {framenumber}"
text_line_2 = f"ret exs: {camera_exposure_speed} us"
if trigger_show_brightness==1:
if arithmetic_mean_of_brightness_per_pixel_relative >= 0.01:
text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*100,2)} %"
elif arithmetic_mean_of_brightness_per_pixel_relative < 0.01:
text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*10e3,2)} pm"
if trigger_show_max_brightness_values_of_colour_channels==1:
text_line_4 = f"max: r:{r_max} g:{g_max} b:{b_max}"
# put the text into the image
image_text_1 = cv.putText(img, text_line_1, pos_1, font,
fontScale, color, thickness, cv.LINE_AA)
image_text_2 = cv.putText(img, text_line_2, pos_2, font,
fontScale, color, thickness, cv.LINE_AA)
image_text_3 = cv.putText(img, text_line_3, pos_3, font,
fontScale, color, thickness, cv.LINE_AA)
if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1:
image_text_4 = cv.putText(img, text_line_4, pos_4, font,
fontScale, color, thickness, cv.LINE_AA)
cv.imshow("Current Frame", img) # display the image
if trigger_record_OpenCV == 1:
out.write(img) # write frame to Video
def calc_arithmetic_mean_of_brightness_per_pixel(image):
"""Calculate overall brightness per pixel of the image. Mittelere Helligkeit pro pixel berechnen."""
#Comment: So rechenintensiv, dass man kein Blitzen sieht im Bild. (Oder sehr selten bzw. schwach). Daher anzeige von max-werten
b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr.
r=r.astype('uint16') # set dtype of one colour to uint16, because the sum of 255+255+255 >255 =765
#the rest will also be uint16 then
image_heigth = r.shape[0]
image_width = r.shape[1]
number_of_colour_channels = 3
arithmetic_mean_of_brightness_image = np.sum((r+g+b)/number_of_colour_channels)
arithmetic_mean_of_brightness_per_pixel = arithmetic_mean_of_brightness_image/(image_width*image_heigth)
max_possible_brightness = 255 # maximum possible brightness
arithmetic_mean_of_brightness_per_pixel_relative = arithmetic_mean_of_brightness_per_pixel/max_possible_brightness
return arithmetic_mean_of_brightness_per_pixel_relative
def get_max_rgb_values(image):
"""get max values of colour channels"""
b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr.
r_max=r.max()
g_max=g.max()
b_max=b.max()
return r_max, g_max, b_max
def create_folder_for_captures():
# Create folder for saving the captured pictures
now = datetime.now(); d1 = now.strftime("%Y-%m-%d %H-%M")
path_cwd = os.getcwd()
path_saveFolder = path_cwd+r"/Capture_"+d1
try:
os.mkdir(path_saveFolder)
folder_exists = True
except OSError:
print("Error! Ending script.")
quit()
return path_saveFolder, folder_exists
def do_processing():
time.sleep(0.001)
#print("ohh i am doing high complex image analysis with computer vision")
def do_processing_frame_r(frame):
print(f"max frame color red: {frame.max()}")
def do_processing_frame_g(frame):
print(f"max frame color green: {frame.max()}")
def do_processing_frame_b(frame):
print(f"max frame color blue: {frame.max()}")
def processing_red():
shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block
shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script
newframe = shm_bools[0]
p_red_finished = shm_bools[1] # not used, but for clarity
p_red_started = shm_bools[4]
shm_framenumber_init = shared_memory.SharedMemory\
(name="shm_framenumber") # Attach to existing shared memory block
shm_framenumber = np.ndarray((1,), dtype=np.uint64, \
buffer= shm_framenumber_init.buf)
# framenumer = shm_framenumber[0]
shm_redframe_init = shared_memory.SharedMemory\
(name="shm_redframe") # Attach to existing shared memory block
shm_redframe = np.ndarray((image_heigth,image_width), dtype=np.uint8, \
buffer= shm_redframe_init.buf)
i=0
while True:
try:
framenumber = shm_framenumber[0]
if i==0:
last_processed_frame = framenumber
conditions_for_first_start = (i==0) and\
(shm_bools[0] == True) and \
(shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \
and (shm_bools[7] == True)
conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True)
# newframe and all color-channel-processings have to be finished
if conditions_for_first_start == True:
shm_bools[4] = True # process started
shm_bools[7] = False # reset trigger
shm_bools[1] = False # set bool for p_red_finished to false
#t1 = time.perf_counter_ns()
do_processing()
i += 1
#print(f"first processing red finished. frame: {framenumber}")
shm_bools[1] = True # set bool for p_red_finished to true
shm_bools[4] = False # process ended
elif conditions_for_starting_processing == True:
shm_bools[4] = True # process started
shm_bools[7] = False # reset trigger
#print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}")
shm_bools[1] = False # set bool for p_red_finished to false
#t1 = time.perf_counter_ns()
do_processing()
#print(f"max frame color red: {shm_redframe.max()}")
if show_opencv_window:
cv.imshow("red", shm_redframe)
cv.waitKey(1)
#print(f"processing red finished. frame: {framenumber}")
shm_bools[1] = True # set bool for p_red_finished to true
#t2 = time.perf_counter_ns()
#print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms")
last_processed_frame = framenumber
shm_bools[4] = False # process ended
# elif shm_bools[0] == False:
# pass
#print(f"no new red frame")
# image processing finished
except KeyboardInterrupt:
try:
shm_bool_init.close()
shm_framenumber_init.close()
shm_redframe_init.close()
except FileNotFoundError:
# Memory already destroyed
pass
def processing_bgr():
shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block
shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script
newframe = shm_bools[0]
p_red_finished = shm_bools[1] # not used, but for clarity
p_red_started = shm_bools[4]
shm_framenumber_init = shared_memory.SharedMemory\
(name="shm_framenumber") # Attach to existing shared memory block
shm_framenumber = np.ndarray((1,), dtype=np.uint64, \
buffer= shm_framenumber_init.buf)
# framenumer = shm_framenumber[0]
shm_frame_init = shared_memory.SharedMemory\
(name="shm_frame") # Attach to existing shared memory block
shm_redframe = np.ndarray((image_heigth,image_width, number_of_colorchannels), dtype=np.uint8, \
buffer= shm_frame_init.buf)
image_bgr = shm_redframe
params_for_blob_detection = define_parameters_for_blob_detection()
i=0
while True:
try:
framenumber = shm_framenumber[0]
if i==0:
last_processed_frame = framenumber
conditions_for_first_start = (i==0) and\
(shm_bools[0] == True) and \
(shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \
and (shm_bools[7] == True)
conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True)
# newframe and all color-channel-processings have to be finished
if conditions_for_first_start == True:
shm_bools[4] = True # process started
shm_bools[7] = False # reset trigger
shm_bools[1] = False # set bool for p_red_finished to false
#t1 = time.perf_counter_ns()
lane_detection(image_bgr, params_for_blob_detection)
i += 1
#print(f"first processing red finished. frame: {framenumber}")
shm_bools[1] = True # set bool for p_red_finished to true
shm_bools[4] = False # process ended
elif conditions_for_starting_processing == True:
shm_bools[4] = True # process started
shm_bools[7] = False # reset trigger
#print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}")
shm_bools[1] = False # set bool for p_red_finished to false
#t1 = time.perf_counter_ns()
start_processing = time.perf_counter_ns()
lane_detection(image_bgr, params_for_blob_detection)
end_processing = time.perf_counter_ns()
time_processing = end_processing-start_processing
time_processing = time_processing*1e-6
time_processing = round(time_processing, 2)
print(f'processing time: {time_processing} ms')
#print(f"max frame color red: {shm_redframe.max()}")
if show_opencv_window:
cv.imshow("red", shm_redframe)
cv.waitKey(1)
#print(f"processing red finished. frame: {framenumber}")
shm_bools[1] = True # set bool for p_red_finished to true
#t2 = time.perf_counter_ns()
#print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms")
last_processed_frame = framenumber
shm_bools[4] = False # process ended
# elif shm_bools[0] == False:
# pass
#print(f"no new red frame")
# image processing finished
except KeyboardInterrupt:
try:
shm_bool_init.close()
shm_framenumber_init.close()
shm_redframe_init.close()
except FileNotFoundError:
# Memory already destroyed
pass
# ----------------------------------------------------------------------------
# main
def main():
start = time.perf_counter()
try:
# create processes
p_red = Process(target=processing_bgr)
processes = [p_red]
print(f"waiting 1 second to create processes")
time.sleep(1) # sind prozesse schon vorhanden
# start acitivity of processes
for process in processes:
process.start()
# start capturing
get_frames_from_picamera()
print('*******************************')
# this below code is only executed if the loop in take_image_picamera_opencv is breaked
# In real use case there will be no end of this program
for process in processes:
process.terminate()
# print time measuring
end = time.perf_counter()
print(f'Script finished in {round(end-start, 2)} s')
# close each SharedMemory instance and unlink to release the shared memory
shm_bools_create.close()
shm_bools_create.unlink()
shm_framenumber_create.close()
shm_framenumber_create.unlink()
shm_redframe_create.close()
shm_redframe_create.unlink()
shm_greenframe_create.close()
shm_greenframe_create.unlink()
shm_blueframe_create.close()
shm_blueframe_create.unlink()
except KeyboardInterrupt:
# Normally this prgoram never gets keyboard interrupted! But here this case is nevertheless handled
# End Script
try:
# close each SharedMemory instance and unlink to release the shared memory
shm_bools.close()
shm_bools.unlink()
shm_framenumber_create.close()
shm_framenumber_create.unlink()
shm_redframe_create.close()
shm_redframe_create.unlink()
shm_greenframe_create.close()
shm_greenframe_create.unlink()
shm_blueframe_create.close()
shm_blueframe_create.unlink()
except FileNotFoundError:
# Memory already destroyed
pass
if __name__ == "__main__":
main()