import cv2 as cv import numpy as np import picamera from picamera.array import PiRGBArray from fractions import Fraction import time from datetime import datetime import os import sys import numpy as np import math as M from multiprocessing import Process, shared_memory #---------------------------------------------------------------------- # Define camera settings SENSOR_MODE = 4 # corresponding sensor mode to resolution OUTPUT_RESOLUTION = (192, 144) # (width, heigth) image_width = OUTPUT_RESOLUTION[0] image_heigth = OUTPUT_RESOLUTION[1] number_of_colorchannels = 3 # r, g, b size_of_frame=int(image_heigth*image_heigth*number_of_colorchannels) frame_dimension = int(number_of_colorchannels) AWB_MODE = 'off' # Auto white balance mode AWB_GAINS = (1.395, 1.15) # White Balance Gains to have colours read correctly: (red, blue). Int, floar or fraction are valid. BRIGHTNESS = 25 # sets the brightness setting of the camera. default is 50. [0-100] #the brighter, the brighter the LEDs and the higher the RGB values and vice versa! CONTRAST = 100 # sets the contrast setting of the camera. The default value is 0. [-100 ... 100] SHUTTER_SPEED = 50 # [µs] ISO = 320 # ISO value EXPOSURE_MODE = 'off' FRAMERATE = 25 # frames per second. 40 fps is max for sensor mode 4 SLEEP_TIME = 2 # Time for sleep-mode for the camera in seconds. My default: 2 s # miscellaneous parameters max_value_of_uint64 = int((2**64) - 1) # @30 fps: konservative calculated driving time: 1.95*1e10 years --> Integer overflow not relevant. #---------------------------------------------------------------------- # Parameters pixels_per_mm = 32/24.25 #[px/mm] for 120 mm camera height for resolution: 192x144 # pixels_per_mm = 71/24.25 #[px/mm] for 120 mm camera height for resolution: 416x320 # pixels_per_mm = 107/24.25 #[px/mm] for 120 mm camera height for resolution: 640x480 # Offset Camera Sensor in Scooty according to Scooty-KS x_offset_camera_mm = 100 # [mm] y_offset_camera_mm = 50 # [mm] x_offset_camera_px = x_offset_camera_mm*pixels_per_mm # [px] y_offset_camera_px = y_offset_camera_mm*pixels_per_mm # [px] # image parameters image_heigth = OUTPUT_RESOLUTION[1] # shape [0] image_width = OUTPUT_RESOLUTION[0]# shape[1] # calculate center of image [x_0, y_0] = np.array([image_width/2, image_heigth/2], dtype=np.uint16) threshold_color_detection = 60 # values under this will not be considered as active leds in each color channel # Parameters for Blob/LED Detection minDiameter_mm = 1.5 # [mm] minimum diameter of detected blob/LED maxDiameter_mm = 8 # [mm] maximum diameter of detected blob/LED # Define color numbers to identify the color channels in the matrix with all detected LEDs. No string, because numpy array should stay uint16 color_number_off = 0 color_number_red = 1 color_number_green = 2 color_number_blue = 3 color_number_yellow = 4 color_number_magenta = 5 color_number_cyan = 6 color_number_white = 7 show_opencv_window = True # show opencv window draw_opencv = True # draw lane and so on print_additional_info = False # create shared Memorys for main-process # shared memory for bools shm_bools_pre=np.array([False, False, False, False, False, False, False, False, False, False], dtype=np.bool8) # create numpy array with bools stored in it # [0]: newframe [1]: p_red_finished [2]: p_green_finished [3]: p_blue_finished # [4]: p_red_started [5]: p_green_started [6]: p_blue_started # [7]: p_red_start_trigger [8]: p_green_start_trigger [9]: p_blue_start_trigger size_of_buffer = shm_bools_pre.nbytes print(f"size of buffer: {size_of_buffer}") # size of buffer: 10 print(f"shm_bools dtype: {shm_bools_pre.dtype}") # dtype: bool shm_bools_create = shared_memory.SharedMemory(name="shm_bools", create=True, size=shm_bools_pre.nbytes) # create a new shared memory block shm_bools = np.ndarray(shm_bools_pre.shape, dtype=shm_bools_pre.dtype, buffer=shm_bools_create.buf) # create a NumPy array backed by shared memory shm_bools[:] = shm_bools_pre[:] # Copy the original data into shared memory # print(shm_bool) # print(shm_bools.name) # shared memory for framenumber shm_framenumber_pre=np.array([0], dtype=np.uint64) size_of_buffer = shm_framenumber_pre.nbytes print(f"size of framenumber-buffer: {size_of_buffer}") #8 print(f"shm_framenumber dtype: {shm_framenumber_pre.dtype}") #uint64 shm_framenumber_create = shared_memory.SharedMemory(name="shm_framenumber", create=True, size=shm_framenumber_pre.nbytes) # create a new shared memory block shm_framenumber = np.ndarray(shm_framenumber_pre.shape, dtype=shm_framenumber_pre.dtype, buffer=shm_framenumber_create.buf) # create a NumPy array backed by shared memory shm_framenumber[:] = shm_framenumber_pre[:] # Copy the original data into shared memory # print(shm_framenumber) # [0] # print(shm_framenumber_create.name) # shm_framenumber # shared memory for red, green, blue frame int_black = 0 # integer for black color/ no color shm_colorframes_pre = np.full(\ (image_heigth,image_width), \ int_black, dtype=np.uint8) size_of_buffer = shm_colorframes_pre.nbytes print(f"size of colorframe-buffer: {size_of_buffer}") #133 120 print(f"shm_colorframes_pre dtype: {shm_colorframes_pre.dtype}") #uint8 shm_redframe_create = shared_memory.SharedMemory(name="shm_redframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block shm_greenframe_create = shared_memory.SharedMemory(name="shm_greenframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block shm_blueframe_create = shared_memory.SharedMemory(name="shm_blueframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block shm_redframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_redframe_create.buf) # create a NumPy array backed by shared memory shm_greenframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_greenframe_create.buf) # create a NumPy array backed by shared memory shm_blueframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_blueframe_create.buf) # create a NumPy array backed by shared memory shm_redframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory shm_greenframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory shm_blueframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory # shared memory bgr frame int_black = 0 # integer for black color/ no color shm_frame_pre = np.full(\ (image_heigth,image_width, number_of_colorchannels), \ int_black, dtype=np.uint8) size_of_buffer = shm_frame_pre.nbytes print(f"size of colorframe-buffer: {size_of_buffer}") #399 360 = 416 x 320 x 3 print(f"shm_colorframes_pre dtype: {shm_frame_pre.dtype}") #uint8 shm_frame_create = shared_memory.SharedMemory(name="shm_frame", create=True, size=shm_frame_pre.nbytes) # create a new shared memory block shm_frame = np.ndarray(shm_frame_pre.shape, dtype=shm_frame_pre.dtype, buffer=shm_frame_create.buf) # create a NumPy array backed by shared memory shm_frame[:] = shm_frame_pre[:] # Copy the original data into shared memory # ---------------------------------------------------------------------------- # Define Functions def points_trafo(detected_LEDs, alpha_rad, dx, dy): """Tranfsform points of LED to lane in KS-LED""" detected_LEDs_trafo = detected_LEDs.copy() # copy, becuase else it is only a pointer detected_LEDs_trafo = detected_LEDs_trafo.astype(np.int16) # avoid integer overflow x_pnts = detected_LEDs_trafo[:,0] y_pnts = detected_LEDs_trafo[:,1] # Translation x1 = x_pnts-dx-x_0 x_trafo=x1 y1 = y_pnts-dy-y_0 y_trafo = y1 # Rotation. Winkel Sensor im UZS, also negativ zu mathematischer definiton x_trafo = np.cos(-alpha_rad)*x1-np.sin(-alpha_rad)*y1 detected_LEDs_trafo[:,0] = x_trafo y_trafo = np.sin(-alpha_rad)*x1+np.cos(-alpha_rad)*y1 detected_LEDs_trafo[:,1] = y_trafo #sort points along lane: x_2, y_2 -axis (KS_LED) detected_LEDs_trafo = detected_LEDs_trafo[detected_LEDs_trafo[:, 0].argsort(kind='quicksort')] return detected_LEDs_trafo def construct_lane(detected_LEDs, img_bgr): """construct the lane""" # This function is partially commented in german, because higher math is used # clearer what is trying to be achieved # get points # xy_pnts = detected_LEDs[:,0:2] # x_pnts = detected_LEDs[:,0] # y_pnts = detected_LEDs[:,1] # approach 2: # fit line through centers of LEDs in KS_0 # DIST_L": the simplest and the fastest least-squares method: the simple euclidean distance param = 0 # not used for DIST_L2 reps = 0.001 # Sufficient accuracy for the radius (distance between the coordinate origin and the line). aeps = 0.001 # Sufficient accuracy for the angle. [dx, dy, x_2, y_2] = cv.fitLine(detected_LEDs[:,0:2], cv.DIST_L2, param, reps, aeps) # x2, y2: same as: mean_of_leds = np.mean([x_pnts, y_pnts], 1) alpha_rad = np.arctan2(dy, dx) # calculate angle of line alpha = np.arctan2(dy, dx)*180/np.pi # calculate angle of line # print(f"Lane: dx: {dx}, dy:{dy}, x2:{x_2}, y2:{y_2}, alpha:{alpha}°") if print_additional_info: print(f"Lane: alpha:{alpha[0]}°") # get smallest distance to point an line # Berechnung nach: Repetitorium Höhere Mathematik, Wirth # Gerade: x = a+ t*b # Punkt : OP = p # d = abs(b x (p-a))/(abs(b)) # info: np.array()[:,0] --> gets only array with 1 dimensions with desired values p = np.array([x_0, y_0]) a = np.array([x_2, y_2])[:,0] b = np.array([np.cos(alpha_rad), np.sin(alpha_rad)])[:,0] # Richtungsvektor c = p-a # Betrag von Vektor: np.linalg.norm(vec) cross= np.cross(b, c) d = np.linalg.norm(cross)/np.linalg.norm(b) # distance [px] #print(f"d: {round(d,2)}") # Fußpunkt (X_LED, Y_LED) t_0_dot = np.dot(c, b) t_0_norm = (np.linalg.norm(b)**2) t_0 = t_0_dot/t_0_norm [x_LED, y_LED] = (a+t_0*b) if print_additional_info: print(f"x_LED: {x_LED}, y_LED: {y_LED}") # Abstand (dx, dy) Fußpunkt zu KS_0 dx_LED = x_LED - x_0 dx_LED_mm = dx_LED*(1/pixels_per_mm) dy_LED = y_LED - y_0 dy_LED_mm = dy_LED*(1/pixels_per_mm) if print_additional_info: print(f"dx_LED:{dx_LED} [px] , dy_LED:{dy_LED} [px]") print(f"dx_LED:{dx_LED_mm} [mm] , dy_LED:{dy_LED_mm} [mm]") # Abstand (dx, dy) Fußpunkt von Bildmitte zu KS_Scooty # Diese Werte zurückgeben dx_LED_scooty = x_LED - x_0 + x_offset_camera_px dx_LED_scooty_mm = dx_LED_scooty*(1/pixels_per_mm) dy_LED_scooty = y_LED - y_0 + y_offset_camera_px dy_LED_scooty_mm = dy_LED_scooty*(1/pixels_per_mm) if print_additional_info: print(f"dx_LED_scooty:{dx_LED_scooty} [px] , dy_LED_scooty:{dy_LED_scooty} [px]") print(f"dx_LED_scooty:{dx_LED_scooty_mm} [mm] , dy_LED_scooty:{dy_LED_scooty_mm} [mm]") # Punkte Trafo, um sortierte position der LEDs entlang Spur zu erhalten # Bei normal detected kann bei vertikaler LED zb Fehler entstehen und dann muster: 211323233 -> daher mit dieser sortierten weitermachen detected_LEDs_KS_LED = points_trafo(detected_LEDs, alpha_rad, dx_LED, dy_LED) if print_additional_info: print(f"Detected LEDs in KS_LED:(x2, y2):\n {detected_LEDs_KS_LED}") #----------------------------------- # draw useful lines and points # draw lane line if draw_opencv: pt_0 = (a+b*np.array([-300, -300])).astype(np.int32) pt_1 = (a+b*np.array([300, 300])).astype(np.int32) #print(f"pt_0: {pt_0}, pt_1: {pt_1}") cv.line(img_bgr, pt_0, pt_1, (255,255,255),1) # draw lane # draw dx dy cv.line(img_bgr, (int(x_0), int(y_0)), (int(x_LED), int(y_LED)), (0,0,255), 2) # shortest distance from KS_0 to KS_LED --> Lot # cv.line(img_bgr, (int(x_0), int(y_0)), (int(x_LED), int(y_0)), (0,0,255), 2) # only dx # cv.line(img_bgr, (int(x_LED), int(y_0)), (int(x_LED), int(y_LED)), (0,0,255), 2) # only dy #draw additional points cv.circle(img_bgr, (int(x_2), int(y_2)), 5,(255,128,255),-1) #pink. Center of points #cv.putText(img_bgr, '(x2, y2)',(int(x_2)+5, int(y_2)-5), cv.FONT_HERSHEY_SIMPLEX, 2, (255,255,255), cv.LINE_AA) cv.circle(img_bgr, (int(x_LED), int(y_LED)), 5,(170,255,0),-1) # lime green. Fußpunkt if show_opencv_window: cv.imshow("Lane", img_bgr) return dx_LED_scooty_mm, dy_LED_scooty_mm, detected_LEDs_KS_LED def convert_rgb_to_grayscale_average(image_bgr): """This function converts the RGB image into an grayscale image. Algorithm: Average: Y = (R+G+B)/3""" # convert dtype to prevent integer overflow while addition image_bgr = image_bgr.astype(np.uint16, copy=False) image_gray = (image_bgr[:,:,0]+image_bgr[:,:,1]+image_bgr[:,:,2])/3 # add values / do conversion image_gray = image_gray.astype(np.uint8, copy=False) # convert back to uint8 return image_gray def create_detector(params_for_blob_detection): detector = cv.SimpleBlobDetector_create(params_for_blob_detection) # Set up the detector with specified parameters. return detector def define_parameters_for_blob_detection(): """set parameters for simple blob detector""" params = cv.SimpleBlobDetector_Params() # Threshold for Convert the source image to binary images by applying thresholding # with several thresholds from minThreshold (inclusive) to maxThreshold (exclusive) # with distance thresholdStep between neighboring thresholds. # Since the Grayscale image is dark if only one color channel is active, # the Threshold values have to be set like this. # particularly the thresholdStep-Value has to be low params.minThreshold=20 # reminder: this value is set for grayscale image params.maxThreshold=255 params.thresholdStep=1 params.filterByColor=False # do not filter blobs by color # Filter blobs by Area params.filterByArea=True minDiameter_px = minDiameter_mm*pixels_per_mm # [px] minimum diameter of detected blob/LED maxDiameter_px = maxDiameter_mm*pixels_per_mm # [px] maximum diameter of detected blob/LED minArea_px2 = np.pi/4*minDiameter_px**2 maxArea_px2 = np.pi/4*maxDiameter_px**2 params.minArea = minArea_px2 # min Area of a blob in px^2 params.maxArea = maxArea_px2 # max Area of a blob in px^2. # reasons for not filtering maxArea: motion blur + rolling shutter --> larger Area # Filter by Inertia params.filterByInertia=False params.minInertiaRatio = 0.2 # [0-1] # Filter by Convexity params.filterByConvexity=False params.minConvexity = 0.2 # [0-1] # Filter by Circularity params.filterByCircularity=False params.minCircularity = 0.4 # [0-1] # params.minDistBetweenBlobs = minDist_px # this has no effect return params def detect_LED_positions_in_grayscale(image_gray, image_bgr, detector): start_processing = time.perf_counter() keypoints = detector.detect(image_gray) # Detect blobs --> LEDs end_processing = time.perf_counter() time_processing = end_processing-start_processing time_processing = time_processing*1000 time_processing = round(time_processing, 2) print(f'processing time detector: {time_processing} ms') number_of_detected_leds = len(keypoints) if number_of_detected_leds != 0: # print information of keypoints print(f"detected LEDs: {number_of_detected_leds}") #Pre-allocate matrix for numpy number_of_rows = number_of_detected_leds number_of_columns = 3 position_of_leds = np.zeros((number_of_rows, number_of_columns), dtype=np.uint16) for i, k in enumerate(keypoints): # x_pos = round(k.pt[0],0) # x position # y_pos = round(k.pt[1],0) # y position # print(f"x: {x_pos} y: {y_pos}") diameter_px = round(k.size,2) diameter_mm = round(diameter_px*1/pixels_per_mm,2) #print(f"diameter [px]: {diameter_px} diameter [mm]: {diameter_mm}") # diameter # area_px2 = round(np.pi/4*k.size**2,0) # area in px^2 # area_mm2 = round(area_px2*(1/pixels_per_mm)**2,0) # print(f"area [px^2]: {area_px2} area [mm^2]: {area_mm2}") # print('') # calculate parameters to transfer to matrix # x_pos = int(np.ceil(x_pos)) # y_pos = int(np.ceil(y_pos)) # Fill matrix # position_of_leds[i,:] = [x_pos,y_pos, 0] position_of_leds[i,0] = int(np.ceil(k.pt[0])) # x positon position_of_leds[i,1] = int(np.ceil(k.pt[1])) # y position if draw_opencv: # draw the keypoints on the original image # cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS ensures the size of the circle corresponds to the size of blob blobs = cv.drawKeypoints(image=image_bgr, keypoints=keypoints, color=(255, 255, 255), \ outImage=np.array([]), flags= cv.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS) if show_opencv_window: # cv.imshow("grayscale", image_gray) cv.imshow("Detected", blobs) return position_of_leds else: print(f"No LEDs were detected") return None def detect_position_of_all_LEDs_grayscale(image_gray, image_bgr, detector): position_of_LEDs = detect_LED_positions_in_grayscale(image_gray, image_bgr, detector) if position_of_LEDs is not None: return position_of_LEDs else: return None def get_color_of_leds(matrix_of_LEDs, image_bgr): # is image_r[y_pos, x_pos] = image_bgr[y_pos,x_pos, 2] ? --> yes. No need to split the color channels. offset = 0 # half of length from rectangle which is going to be used to determine the color around the middle point of the blob/led # offset = 0 --> only the value from the middle point of the blob/led # offset=1 --> 9 values, offset=2-->25 values for led in matrix_of_LEDs: x_pos = led[0] # uint16 y_pos = led[1] # uint16 # get values of color channels in region around middle point of blob/led: # +1 at stop index, because it is not inclusive region_around_blue_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 0] # uint8 region_around_green_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 1] # uint8 region_around_red_led = image_bgr[y_pos-offset:y_pos+offset+1, x_pos-offset:x_pos+offset+1, 2] # uint8 # average of the values # convert dtype to prevent integer overflow while addition region_around_red_led = region_around_red_led.astype(np.uint16, copy=False) region_around_green_led = region_around_green_led.astype(np.uint16, copy=False) region_around_blue_led = region_around_blue_led.astype(np.uint16, copy=False) # sum all elements in matrix and divide with number of elements number_of_elements= region_around_blue_led.size value_of_red_led = region_around_red_led.sum()/number_of_elements # float64, if not integer result value_of_green_led = region_around_green_led.sum()/number_of_elements # float64, if not integer result value_of_blue_led = region_around_blue_led.sum()/number_of_elements # float64, if not integer result # determine which leds are active: # if value > threshold --> led is active status_blue_led = False; status_green_led = False; status_red_led = False if value_of_blue_led > threshold_color_detection: status_blue_led = True if value_of_green_led > threshold_color_detection: status_green_led = True if value_of_red_led > threshold_color_detection: status_red_led = True # determine color by checking the cases: # case 1: red if status_blue_led==False and status_green_led==False and status_red_led==True: color = color_number_red # case 2: green elif status_blue_led==False and status_green_led==True and status_red_led==False: color = color_number_green # case 3: blue elif status_blue_led==True and status_green_led==False and status_red_led==False: color = color_number_blue # case 4: yellow = red + green elif status_blue_led==False and status_green_led==True and status_red_led==True: color = color_number_yellow # case 5: magenta = red + blue elif status_blue_led==True and status_green_led==False and status_red_led==True: color = color_number_magenta # case 6: cyan = green + blue elif status_blue_led==True and status_green_led==True and status_red_led==False: color = color_number_cyan # case 7: white = red + green + blue elif status_blue_led==True and status_green_led==True and status_red_led==True: color = color_number_white # case 8: led not active # this case can not occur, because no inactive led can be detected from the implemented blob-algorithm in detect_LED_positions_in_grayscale else: color = color_number_off # fill matrix with color led[2] = color # uint16 return matrix_of_LEDs def detect_LEDs_with_grayscale(image_bgr, detector): # convert rgb to grayscale image # start_m1 = time.perf_counter() image_gray = convert_rgb_to_grayscale_average(image_bgr) # end_m1 = time.perf_counter() # time_processing = end_m1-start_m1 # time_processing = time_processing*1000 # time_processing = round(time_processing, 2) # print(f'processing time conversion: {time_processing} ms') # get position of leds position_of_LEDs = detect_position_of_all_LEDs_grayscale(image_gray=image_gray, image_bgr=image_bgr, detector=detector) #position_of_LEDs = None if position_of_LEDs is not None: # determine color of leds and add to matrix detected_LEDs = get_color_of_leds(position_of_LEDs, image_bgr) return detected_LEDs else: return None def lane_detection(image_bgr, detector): # Detect LEDs print(f"Detect LEDs and color:") detected_LEDs = detect_LEDs_with_grayscale(image_bgr, detector) if detected_LEDs is not None: # Contruct lane #print(f"_____________________________________") # print("Contruct lane") dx_LED_scooty_mm, dy_LED_scooty_mm, detected_LEDs_KS_LED = \ construct_lane(detected_LEDs, image_bgr) # print result if print_additional_info: print(f"Detected LEDs relative to image-center(x0,y0):\n{detected_LEDs}") return detected_LEDs else: return None # ------------------- # Define Funcions def get_frames_from_picamera(): # newframe= shm_bools[0] # do not use this! no updted values in "newframe" # framenumber = shm_framenumber[0] # Initialise Camera with picamera.PiCamera() as camera: with PiRGBArray(camera) as output: # Set camera settings camera.sensor_mode = SENSOR_MODE # force camera into desired sensor mode camera.resolution = OUTPUT_RESOLUTION # frame will be resized from GPU to this resolution. No CPU usage! camera.framerate = FRAMERATE camera.awb_mode = AWB_MODE camera.awb_gains = AWB_GAINS camera.iso = ISO camera.shutter_speed = SHUTTER_SPEED time.sleep(SLEEP_TIME) # wait for iso gains and digital_gain and analog_gain to settle before fixing the gains with exposure_mode = off camera.exposure_mode = EXPOSURE_MODE time.sleep(1) # wait before applying brightness and contrast camera.brightness = BRIGHTNESS camera.contrast = CONTRAST time.sleep(SLEEP_TIME) # Camera warm-up time to apply settings t_start= time.perf_counter() # save time for fps calculation for frameidx, frame in enumerate(camera.capture_continuous(output, format='bgr', use_video_port=True)): # General information: # - always the newest frame is recieved: processing must be faster than fps if every frame should be processed shm_bools[0] = True framenumber = frameidx+1 # frameidx starts with 0, framenumber with 1 shm_framenumber[0] = framenumber #print('') #print(f"new frame: {framenumber}") #image = frame.array # raw NumPy array without JPEG encoding #b,g,r = cv.split(frame.array) # split colour channels of raw NumPy array without JPEG encoding shm_frame[:] = frame.array #shm_redframe[:] = r #shm_greenframe[:] = g #shm_blueframe[:] = b # for better performance one can assign directly in funtion line the values to the shm_memorys: shm_red, .. , ... = cv.split(..) shm_bools[7:10]=[True] # trigger the start of the processing for each colorchannel #print(shm_bools[7], shm_bools[8], shm_bools[9]) #display_image_with_text(image, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out) # show the frame output.truncate(0) # clear the stream for next frame if framenumber == 500: # 5 sek @ 30 fps, only for performance measuring t_stop=time.perf_counter() print(f"calculated fps: {framenumber/(t_stop-t_start)}") break def display_image_with_text(img, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out): img = img.copy() # make copy of image and do not modify the original image # please activate only one trigger once trigger_show_brightness = 0 # trigger for (not) calculating andshowing the brightness of the image+ if trigger_show_brightness == 1: arithmetic_mean_of_brightness_per_pixel_relative = calc_arithmetic_mean_of_brightness_per_pixel(img) trigger_show_max_brightness_values_of_colour_channels = 0 # trigger for (not) calculating and showing max values of colour chanels if trigger_show_max_brightness_values_of_colour_channels == 1: r_max, g_max, b_max = get_max_rgb_values(img) font = cv.FONT_HERSHEY_SIMPLEX # font fontScale = 1 # fontScale color = (255, 255, 255) # Font colour in BGR thickness = 1 # Line thickness in px # set text position frame_width = int(img.shape[1]) frame_height = int(img.shape[0]) text_start_position_Y = int(round(frame_height*0.12)) # start position of text in pixels 12 % of frame height text_linespacing = 50 # line spacing between two strings in pixels # text_start_position_X = int(frame_width/4) # start text from 1/4 of image width text_start_position_X = int(0) # start text from left edge of image # set position in (x,y)-coordinated from top left corner. Bottom-left corner of the text string in the image. pos_1 = (text_start_position_X, text_start_position_Y) # start text from 1/4 of image width pos_2 = (text_start_position_X, text_start_position_Y+text_linespacing) # start text from 1/4 of image width pos_3 = (text_start_position_X, text_start_position_Y+2*text_linespacing) # start text from 1/4 of image width if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1: pos_4 = (text_start_position_X, text_start_position_Y+3*text_linespacing) # start text from 1/4 of image width # define text to display text_line_1 = f"set ss: {shutter_speed} us" text_line_3 = f"Frame: {framenumber}" text_line_2 = f"ret exs: {camera_exposure_speed} us" if trigger_show_brightness==1: if arithmetic_mean_of_brightness_per_pixel_relative >= 0.01: text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*100,2)} %" elif arithmetic_mean_of_brightness_per_pixel_relative < 0.01: text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*10e3,2)} pm" if trigger_show_max_brightness_values_of_colour_channels==1: text_line_4 = f"max: r:{r_max} g:{g_max} b:{b_max}" # put the text into the image #image_text_1 = cv.putText(img, text_line_1, pos_1, font, # fontScale, color, thickness, cv.LINE_AA) #image_text_2 = cv.putText(img, text_line_2, pos_2, font, # fontScale, color, thickness, cv.LINE_AA) #image_text_3 = cv.putText(img, text_line_3, pos_3, font, # fontScale, color, thickness, cv.LINE_AA) if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1: image_text_4 = cv.putText(img, text_line_4, pos_4, font, fontScale, color, thickness, cv.LINE_AA) cv.imshow("Current Frame", img) # display the image if trigger_record_OpenCV == 1: out.write(img) # write frame to Video def calc_arithmetic_mean_of_brightness_per_pixel(image): """Calculate overall brightness per pixel of the image. Mittelere Helligkeit pro pixel berechnen.""" #Comment: So rechenintensiv, dass man kein Blitzen sieht im Bild. (Oder sehr selten bzw. schwach). Daher anzeige von max-werten b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr. r=r.astype('uint16') # set dtype of one colour to uint16, because the sum of 255+255+255 >255 =765 #the rest will also be uint16 then image_heigth = r.shape[0] image_width = r.shape[1] number_of_colour_channels = 3 arithmetic_mean_of_brightness_image = np.sum((r+g+b)/number_of_colour_channels) arithmetic_mean_of_brightness_per_pixel = arithmetic_mean_of_brightness_image/(image_width*image_heigth) max_possible_brightness = 255 # maximum possible brightness arithmetic_mean_of_brightness_per_pixel_relative = arithmetic_mean_of_brightness_per_pixel/max_possible_brightness return arithmetic_mean_of_brightness_per_pixel_relative def get_max_rgb_values(image): """get max values of colour channels""" b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr. r_max=r.max() g_max=g.max() b_max=b.max() return r_max, g_max, b_max def create_folder_for_captures(): # Create folder for saving the captured pictures now = datetime.now(); d1 = now.strftime("%Y-%m-%d %H-%M") path_cwd = os.getcwd() path_saveFolder = path_cwd+r"/Capture_"+d1 try: os.mkdir(path_saveFolder) folder_exists = True except OSError: print("Error! Ending script.") quit() return path_saveFolder, folder_exists def do_processing(): time.sleep(0.001) #print("ohh i am doing high complex image analysis with computer vision") def do_processing_frame_r(frame): print(f"max frame color red: {frame.max()}") def do_processing_frame_g(frame): print(f"max frame color green: {frame.max()}") def do_processing_frame_b(frame): print(f"max frame color blue: {frame.max()}") def processing_red(): shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script newframe = shm_bools[0] p_red_finished = shm_bools[1] # not used, but for clarity p_red_started = shm_bools[4] shm_framenumber_init = shared_memory.SharedMemory\ (name="shm_framenumber") # Attach to existing shared memory block shm_framenumber = np.ndarray((1,), dtype=np.uint64, \ buffer= shm_framenumber_init.buf) # framenumer = shm_framenumber[0] shm_redframe_init = shared_memory.SharedMemory\ (name="shm_redframe") # Attach to existing shared memory block shm_redframe = np.ndarray((image_heigth,image_width), dtype=np.uint8, \ buffer= shm_redframe_init.buf) i=0 while True: try: framenumber = shm_framenumber[0] if i==0: last_processed_frame = framenumber conditions_for_first_start = (i==0) and\ (shm_bools[0] == True) and \ (shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \ and (shm_bools[7] == True) conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True) # newframe and all color-channel-processings have to be finished if conditions_for_first_start == True: shm_bools[4] = True # process started shm_bools[7] = False # reset trigger shm_bools[1] = False # set bool for p_red_finished to false #t1 = time.perf_counter_ns() do_processing() i += 1 #print(f"first processing red finished. frame: {framenumber}") shm_bools[1] = True # set bool for p_red_finished to true shm_bools[4] = False # process ended elif conditions_for_starting_processing == True: shm_bools[4] = True # process started shm_bools[7] = False # reset trigger #print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}") shm_bools[1] = False # set bool for p_red_finished to false #t1 = time.perf_counter_ns() do_processing() #print(f"max frame color red: {shm_redframe.max()}") if show_opencv_window: cv.imshow("red", shm_redframe) cv.waitKey(1) #print(f"processing red finished. frame: {framenumber}") shm_bools[1] = True # set bool for p_red_finished to true #t2 = time.perf_counter_ns() #print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms") last_processed_frame = framenumber shm_bools[4] = False # process ended # elif shm_bools[0] == False: # pass #print(f"no new red frame") # image processing finished except KeyboardInterrupt: try: shm_bool_init.close() shm_framenumber_init.close() shm_redframe_init.close() except FileNotFoundError: # Memory already destroyed pass def processing_bgr(): shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script newframe = shm_bools[0] p_red_finished = shm_bools[1] # not used, but for clarity p_red_started = shm_bools[4] shm_framenumber_init = shared_memory.SharedMemory\ (name="shm_framenumber") # Attach to existing shared memory block shm_framenumber = np.ndarray((1,), dtype=np.uint64, \ buffer= shm_framenumber_init.buf) # framenumer = shm_framenumber[0] shm_frame_init = shared_memory.SharedMemory\ (name="shm_frame") # Attach to existing shared memory block shm_redframe = np.ndarray((image_heigth,image_width, number_of_colorchannels), dtype=np.uint8, \ buffer= shm_frame_init.buf) image_bgr = shm_redframe params_for_blob_detection = define_parameters_for_blob_detection() detector = create_detector(params_for_blob_detection) i=0 while True: try: framenumber = shm_framenumber[0] if i==0: last_processed_frame = framenumber conditions_for_first_start = (i==0) and\ (shm_bools[0] == True) and \ (shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \ and (shm_bools[7] == True) conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True) # newframe and all color-channel-processings have to be finished if conditions_for_first_start == True: print("first start") shm_bools[4] = True # process started shm_bools[7] = False # reset trigger shm_bools[1] = False # set bool for p_red_finished to false #t1 = time.perf_counter_ns() lane_detection(image_bgr, detector) # hier immer noch altes bild -->> Processing ist langsamer als neue bilder kommen.. i += 1 #print(f"first processing red finished. frame: {framenumber}") shm_bools[1] = True # set bool for p_red_finished to true shm_bools[4] = False # process ended elif conditions_for_starting_processing == True: shm_bools[4] = True # process started shm_bools[7] = False # reset trigger #print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}") shm_bools[1] = False # set bool for p_red_finished to false #t1 = time.perf_counter_ns() cv.imshow("AAA", shm_redframe) lane_detection(image_bgr, detector) #print(f"max frame color red: {shm_redframe.max()}") if show_opencv_window: cv.waitKey(1) #print(f"processing red finished. frame: {framenumber}") shm_bools[1] = True # set bool for p_red_finished to true #t2 = time.perf_counter_ns() #print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms") last_processed_frame = framenumber shm_bools[4] = False # process ended # elif shm_bools[0] == False: # pass #print(f"no new red frame") # image processing finished except KeyboardInterrupt: try: shm_bool_init.close() shm_framenumber_init.close() shm_redframe_init.close() except FileNotFoundError: # Memory already destroyed pass # ---------------------------------------------------------------------------- # main def main(): start = time.perf_counter() try: # create processes p_red = Process(target=processing_bgr) processes = [p_red] print(f"waiting 1 second to create processes") time.sleep(1) # sind prozesse schon vorhanden # start acitivity of processes for process in processes: process.start() # start capturing get_frames_from_picamera() print('*******************************') # this below code is only executed if the loop in take_image_picamera_opencv is breaked # In real use case there will be no end of this program for process in processes: process.terminate() # print time measuring end = time.perf_counter() print(f'Script finished in {round(end-start, 2)} s') # close each SharedMemory instance and unlink to release the shared memory shm_bools_create.close() shm_bools_create.unlink() shm_framenumber_create.close() shm_framenumber_create.unlink() shm_redframe_create.close() shm_redframe_create.unlink() shm_greenframe_create.close() shm_greenframe_create.unlink() shm_blueframe_create.close() shm_blueframe_create.unlink() except KeyboardInterrupt: # Normally this prgoram never gets keyboard interrupted! But here this case is nevertheless handled # End Script try: # close each SharedMemory instance and unlink to release the shared memory shm_bools.close() shm_bools.unlink() shm_framenumber_create.close() shm_framenumber_create.unlink() shm_redframe_create.close() shm_redframe_create.unlink() shm_greenframe_create.close() shm_greenframe_create.unlink() shm_blueframe_create.close() shm_blueframe_create.unlink() except FileNotFoundError: # Memory already destroyed pass if __name__ == "__main__": main()