# Creation Date: 14.01.2022 # Author: Kenan Gömek # This script takes pictures with Picameras VideoPort like it will be used to work with OpenCV and saves it with OpenCV to have the real use case pictures. # This script is designed for capturing a Video with the frame number in it. press "q" to quit. # You can take images with "i". # Update: 28.02.2022 # This program is a development step for the final program # This program works with multiprocessing and with Picamera # create shared memorys once before exectution instead of twice: in main and take_image_picamera_opencv import cv2 as cv import numpy as np import picamera from picamera.array import PiRGBArray from fractions import Fraction import time from datetime import datetime import os import sys from multiprocessing import Process, shared_memory # Define camera settings # divide origin resoluton by a number, to have the origin aspect ratio # RESOLUTION = (3280, 2464) # Max Photo-Resolution CAM03 and CAM04 # no image with PiCamera Videoport at this Resolution.. Probably GPU Memory and CPU issues. # RESOLUTION = (1640,1232) # 2nd best Resolution for CAM03 and CAM04 with FUll FOV (2x2 binning) # Mode 4 SENSOR_MODE = 4 # corresponding sensor mode to resolution OUTPUT_RESOLUTION = (416, 320) # (width, heigth) image_width = OUTPUT_RESOLUTION[0] image_heigth = OUTPUT_RESOLUTION[1] # (410,308) is being upscaled to (416,320) from ISP (warning in bash), but image will have still (410,308) pixels. # OUTPUT_RESOLUTION = (820, 616) # (1640x1232)/2=(820,616) # bash: frame size rounded up from 820x616 to 832x624 number_of_colorchannels = 3 # r, g, b size_of_frame=int(image_heigth*image_heigth) frame_dimension = int(1) AWB_MODE = 'off' # Auto white balance mode AWB_GAINS = (Fraction(485, 256), Fraction(397, 256)) # White Balance Gains to have colours read correctly: (red, blue) ISO = 100 # ISO value EXPOSURE_MODE = 'off' FRAMERATE = 30 # frames per second. 40 fps is max for sensor mode 4 SLEEP_TIME = 2 # Time for sleep-mode for the camera in seconds. My default: 2 s # miscellaneous parameters max_value_of_uint64 = int((2**64) - 1) # @30 fps: konservative calculated driving time: 1.95*1e10 years --> Integer overflow not relevant. # settings for development show_opencv_window = False # create shared Memorys for main-process # shared memory for bools shm_bools_pre=np.array([False, False, False, False, False, False, False, False, False, False], dtype=np.bool8) # create numpy array with bools stored in it # [0]: newframe [1]: p_red_finished [2]: p_green_finished [3]: p_blue_finished # [4]: p_red_started [5]: p_green_started [6]: p_blue_started # [7]: p_red_start_trigger [8]: p_green_start_trigger [9]: p_blue_start_trigger size_of_buffer = shm_bools_pre.nbytes print(f"size of buffer: {size_of_buffer}") # size of buffer: 10 print(f"shm_bools dtype: {shm_bools_pre.dtype}") # dtype: bool shm_bools_create = shared_memory.SharedMemory(name="shm_bools", create=True, size=shm_bools_pre.nbytes) # create a new shared memory block shm_bools = np.ndarray(shm_bools_pre.shape, dtype=shm_bools_pre.dtype, buffer=shm_bools_create.buf) # create a NumPy array backed by shared memory shm_bools[:] = shm_bools_pre[:] # Copy the original data into shared memory # print(shm_bool) # print(shm_bools.name) # shared memory for framenumber shm_framenumber_pre=np.array([0], dtype=np.uint64) size_of_buffer = shm_framenumber_pre.nbytes print(f"size of framenumber-buffer: {size_of_buffer}") #8 print(f"shm_framenumber dtype: {shm_framenumber_pre.dtype}") #uint64 shm_framenumber_create = shared_memory.SharedMemory(name="shm_framenumber", create=True, size=shm_framenumber_pre.nbytes) # create a new shared memory block shm_framenumber = np.ndarray(shm_framenumber_pre.shape, dtype=shm_framenumber_pre.dtype, buffer=shm_framenumber_create.buf) # create a NumPy array backed by shared memory shm_framenumber[:] = shm_framenumber_pre[:] # Copy the original data into shared memory # print(shm_framenumber) # [0] # print(shm_framenumber_create.name) # shm_framenumber # shared memory for red, green, blue frame int_black = 0 # integer for black color/ no color shm_colorframes_pre = np.full(\ (image_heigth,image_width), \ int_black, dtype=np.uint8) size_of_buffer = shm_colorframes_pre.nbytes print(f"size of colorframe-buffer: {size_of_buffer}") #133 120 print(f"shm_colorframes_pre dtype: {shm_colorframes_pre.dtype}") #uint8 shm_redframe_create = shared_memory.SharedMemory(name="shm_redframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block shm_greenframe_create = shared_memory.SharedMemory(name="shm_greenframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block shm_blueframe_create = shared_memory.SharedMemory(name="shm_blueframe", create=True, size=shm_colorframes_pre.nbytes) # create a new shared memory block shm_redframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_redframe_create.buf) # create a NumPy array backed by shared memory shm_greenframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_greenframe_create.buf) # create a NumPy array backed by shared memory shm_blueframe = np.ndarray(shm_colorframes_pre.shape, dtype=shm_colorframes_pre.dtype, buffer=shm_blueframe_create.buf) # create a NumPy array backed by shared memory shm_redframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory shm_greenframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory shm_blueframe[:] = shm_colorframes_pre[:] # Copy the original data into shared memory # ---------------------------------------------------------------------------- # Define Funcions def get_frames_from_picamera(shutter_speed): # newframe= shm_bools[0] # do not use this! no updted values in "newframe" # framenumber = shm_framenumber[0] # Initialise Camera with picamera.PiCamera() as camera: with PiRGBArray(camera) as output: # Set camera settings camera.sensor_mode = SENSOR_MODE # force camera into desired sensor mode camera.resolution = OUTPUT_RESOLUTION # frame will be resized from GPU to this resolution. No CPU usage! camera.framerate = FRAMERATE camera.awb_mode = AWB_MODE camera.awb_gains = AWB_GAINS camera.iso = ISO camera.shutter_speed = shutter_speed camera.exposure_mode = EXPOSURE_MODE time.sleep(SLEEP_TIME) # Camera warm-up time to apply settings t_start= time.perf_counter() # save time for fps calculation for frameidx, frame in enumerate(camera.capture_continuous(output, format='bgr', use_video_port=True)): # General information: # - always the newest frame is recieved: processing must be faster than fps if every frame should be processed shm_bools[0] = True framenumber = frameidx+1 # frameidx starts with 0, framenumber with 1 shm_framenumber[0] = framenumber #print('') #print(f"new frame: {framenumber}") #image = frame.array # raw NumPy array without JPEG encoding b,g,r = cv.split(frame.array) # split colour channels of raw NumPy array without JPEG encoding shm_redframe[:] = r shm_greenframe[:] = g shm_blueframe[:] = b # for better performance one can assign directly in funtion line the values to the shm_memorys: shm_red, .. , ... = cv.split(..) shm_bools[7:10]=[True] # trigger the start of the processing for each colorchannel #print(shm_bools[7], shm_bools[8], shm_bools[9]) #display_image_with_text(image, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out) # show the frame output.truncate(0) # clear the stream for next frame if framenumber == 500: # 5 sek @ 30 fps, only for performance measuring t_stop=time.perf_counter() print(f"calculated fps: {framenumber/(t_stop-t_start)}") break def display_image_with_text(img, shutter_speed, framenumber, camera_exposure_speed, trigger_record_OpenCV, out): img = img.copy() # make copy of image and do not modify the original image # please activate only one trigger once trigger_show_brightness = 0 # trigger for (not) calculating andshowing the brightness of the image+ if trigger_show_brightness == 1: arithmetic_mean_of_brightness_per_pixel_relative = calc_arithmetic_mean_of_brightness_per_pixel(img) trigger_show_max_brightness_values_of_colour_channels = 0 # trigger for (not) calculating and showing max values of colour chanels if trigger_show_max_brightness_values_of_colour_channels == 1: r_max, g_max, b_max = get_max_rgb_values(img) font = cv.FONT_HERSHEY_SIMPLEX # font fontScale = 1 # fontScale color = (255, 255, 255) # Font colour in BGR thickness = 1 # Line thickness in px # set text position frame_width = int(img.shape[1]) frame_height = int(img.shape[0]) text_start_position_Y = int(round(frame_height*0.12)) # start position of text in pixels 12 % of frame height text_linespacing = 50 # line spacing between two strings in pixels # text_start_position_X = int(frame_width/4) # start text from 1/4 of image width text_start_position_X = int(0) # start text from left edge of image # set position in (x,y)-coordinated from top left corner. Bottom-left corner of the text string in the image. pos_1 = (text_start_position_X, text_start_position_Y) # start text from 1/4 of image width pos_2 = (text_start_position_X, text_start_position_Y+text_linespacing) # start text from 1/4 of image width pos_3 = (text_start_position_X, text_start_position_Y+2*text_linespacing) # start text from 1/4 of image width if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1: pos_4 = (text_start_position_X, text_start_position_Y+3*text_linespacing) # start text from 1/4 of image width # define text to display text_line_1 = f"set ss: {shutter_speed} us" text_line_3 = f"Frame: {framenumber}" text_line_2 = f"ret exs: {camera_exposure_speed} us" if trigger_show_brightness==1: if arithmetic_mean_of_brightness_per_pixel_relative >= 0.01: text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*100,2)} %" elif arithmetic_mean_of_brightness_per_pixel_relative < 0.01: text_line_4 = f"brightness: {round(arithmetic_mean_of_brightness_per_pixel_relative*10e3,2)} pm" if trigger_show_max_brightness_values_of_colour_channels==1: text_line_4 = f"max: r:{r_max} g:{g_max} b:{b_max}" # put the text into the image image_text_1 = cv.putText(img, text_line_1, pos_1, font, fontScale, color, thickness, cv.LINE_AA) image_text_2 = cv.putText(img, text_line_2, pos_2, font, fontScale, color, thickness, cv.LINE_AA) image_text_3 = cv.putText(img, text_line_3, pos_3, font, fontScale, color, thickness, cv.LINE_AA) if trigger_show_brightness==1 or trigger_show_max_brightness_values_of_colour_channels==1: image_text_4 = cv.putText(img, text_line_4, pos_4, font, fontScale, color, thickness, cv.LINE_AA) cv.imshow("Current Frame", img) # display the image if trigger_record_OpenCV == 1: out.write(img) # write frame to Video def calc_arithmetic_mean_of_brightness_per_pixel(image): """Calculate overall brightness per pixel of the image. Mittelere Helligkeit pro pixel berechnen.""" #Comment: So rechenintensiv, dass man kein Blitzen sieht im Bild. (Oder sehr selten bzw. schwach). Daher anzeige von max-werten b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr. r=r.astype('uint16') # set dtype of one colour to uint16, because the sum of 255+255+255 >255 =765 #the rest will also be uint16 then image_heigth = r.shape[0] image_width = r.shape[1] number_of_colour_channels = 3 arithmetic_mean_of_brightness_image = np.sum((r+g+b)/number_of_colour_channels) arithmetic_mean_of_brightness_per_pixel = arithmetic_mean_of_brightness_image/(image_width*image_heigth) max_possible_brightness = 255 # maximum possible brightness arithmetic_mean_of_brightness_per_pixel_relative = arithmetic_mean_of_brightness_per_pixel/max_possible_brightness return arithmetic_mean_of_brightness_per_pixel_relative def get_max_rgb_values(image): """get max values of colour channels""" b,g,r = cv.split(image) # OpenCV works with bgr. Image is also speciefied to be captured in bgr. r_max=r.max() g_max=g.max() b_max=b.max() return r_max, g_max, b_max def create_folder_for_captures(): # Create folder for saving the captured pictures now = datetime.now(); d1 = now.strftime("%Y-%m-%d %H-%M") path_cwd = os.getcwd() path_saveFolder = path_cwd+r"/Capture_"+d1 try: os.mkdir(path_saveFolder) folder_exists = True except OSError: print("Error! Ending script.") quit() return path_saveFolder, folder_exists def do_processing(): time.sleep(0.001) #print("ohh i am doing high complex image analysis with computer vision") def do_processing_frame_r(frame): print(f"max frame color red: {frame.max()}") def do_processing_frame_g(frame): print(f"max frame color green: {frame.max()}") def do_processing_frame_b(frame): print(f"max frame color blue: {frame.max()}") def processing_red(): shm_bool_init = shared_memory.SharedMemory(name="shm_bools") # Attach to existing shared memory block shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script newframe = shm_bools[0] p_red_finished = shm_bools[1] # not used, but for clarity p_red_started = shm_bools[4] shm_framenumber_init = shared_memory.SharedMemory\ (name="shm_framenumber") # Attach to existing shared memory block shm_framenumber = np.ndarray((1,), dtype=np.uint64, \ buffer= shm_framenumber_init.buf) # framenumer = shm_framenumber[0] shm_redframe_init = shared_memory.SharedMemory\ (name="shm_redframe") # Attach to existing shared memory block shm_redframe = np.ndarray((image_heigth,image_width), dtype=np.uint8, \ buffer= shm_redframe_init.buf) i=0 while True: try: framenumber = shm_framenumber[0] if i==0: last_processed_frame = framenumber conditions_for_first_start = (i==0) and\ (shm_bools[0] == True) and \ (shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \ and (shm_bools[7] == True) conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[7] == True) # newframe and all color-channel-processings have to be finished if conditions_for_first_start == True: shm_bools[4] = True # process started shm_bools[7] = False # reset trigger shm_bools[1] = False # set bool for p_red_finished to false #t1 = time.perf_counter_ns() do_processing() i += 1 #print(f"first processing red finished. frame: {framenumber}") shm_bools[1] = True # set bool for p_red_finished to true shm_bools[4] = False # process ended elif conditions_for_starting_processing == True: shm_bools[4] = True # process started shm_bools[7] = False # reset trigger #print(f"red: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}") shm_bools[1] = False # set bool for p_red_finished to false #t1 = time.perf_counter_ns() do_processing() #print(f"max frame color red: {shm_redframe.max()}") if show_opencv_window: cv.imshow("red", shm_redframe) cv.waitKey(1) #print(f"processing red finished. frame: {framenumber}") shm_bools[1] = True # set bool for p_red_finished to true #t2 = time.perf_counter_ns() #print(f"processing time for red channel: {round((t2-t1)*1e-6,2)} ms") last_processed_frame = framenumber shm_bools[4] = False # process ended # elif shm_bools[0] == False: # pass #print(f"no new red frame") # image processing finished except KeyboardInterrupt: try: shm_bool_init.close() shm_framenumber_init.close() shm_redframe_init.close() except FileNotFoundError: # Memory already destroyed pass def processing_green(): shm_bool_init = shared_memory.SharedMemory(name="shm_bools") shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bool_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script newframe= shm_bools[0] p_green_finished= shm_bools[2] # not used, but for clarity p_green_started = shm_bools[5] shm_framenumber_init = shared_memory.SharedMemory\ (name="shm_framenumber") # Attach to existing shared memory block shm_framenumber = np.ndarray((1,), dtype=np.uint64, \ buffer= shm_framenumber_init.buf) # framenumer = shm_framenumber[0] shm_greenframe_init = shared_memory.SharedMemory\ (name="shm_greenframe") # Attach to existing shared memory block shm_greenframe = np.ndarray((image_heigth,image_width), dtype=np.uint8, \ buffer= shm_greenframe_init.buf) i=0 while True: try: framenumber = shm_framenumber[0] if i==0: last_processed_frame = framenumber conditions_for_first_start = (i==0) and\ (shm_bools[0] == True) and \ (shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \ and (shm_bools[8] == True) conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[8] == True) if conditions_for_first_start == True: shm_bools[5] = True # process started shm_bools[8] = False # reset trigger shm_bools[2] = False # set bool for p_green_finished to false i += 1 do_processing() #print(f"first processing green finished. frame: {framenumber}") shm_bools[2] = True # set bool for p_green_finished to true shm_bools[5] = False # process ended elif conditions_for_starting_processing == True: shm_bools[5] = True # process started shm_bools[8] = False # reset trigger #print(f"green: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}") shm_bools[2] = False # set bool for p_green_finished to false do_processing() if show_opencv_window: cv.imshow("green", shm_greenframe) cv.waitKey(1) #print(f"max frame color green: {shm_greenframe.max()}") #print(f"processing green finished. frame: {framenumber}") shm_bools[2] = True # set bool for p_green_finished to true last_processed_frame = framenumber shm_bools[5] = False # process ended # elif shm_bools[0] == False: # pass # # print(f"no new green frame") # image processing finished except KeyboardInterrupt: try: shm_bool_init.close() shm_framenumber_init.close() shm_greenframe_init.close() except FileNotFoundError: # Memory already destroyed pass def processing_blue(): shm_bools_init = shared_memory.SharedMemory(name="shm_bools") shm_bools = np.ndarray((10,), dtype=np.bool8, buffer= shm_bools_init.buf) # do not: newframe = np.array(...)[0] --> then one can not assign new value in main script newframe= shm_bools[0] p_red_finished= shm_bools[3] # not used, but for clarity p_blue_started = shm_bools[6] shm_framenumber_init = shared_memory.SharedMemory\ (name="shm_framenumber") # Attach to existing shared memory block shm_framenumber = np.ndarray((1,), dtype=np.uint64, \ buffer= shm_framenumber_init.buf) # framenumer = shm_framenumber[0] shm_blueframe_init = shared_memory.SharedMemory\ (name="shm_blueframe") # Attach to existing shared memory block shm_blueframe = np.ndarray((image_heigth,image_width), dtype=np.uint8, \ buffer= shm_blueframe_init.buf) i=0 while True: try: framenumber = shm_framenumber[0] if i==0: last_processed_frame = framenumber conditions_for_first_start = (i==0) and\ (shm_bools[0] == True) and \ (shm_bools[1] == False) and (shm_bools[2] == False) and (shm_bools[3] == False) \ and (shm_bools[9] == True) conditions_for_starting_processing = (framenumber>last_processed_frame) and (shm_bools[9] == True) # newframe and all color-channel-processings have to be finished if conditions_for_first_start == True: shm_bools[6] = True # process started shm_bools[9] = False # reset trigger shm_bools[3] = False # set bool for p_blue_finished to false i += 1 do_processing() #print(f"first processing blue finished. frame: {framenumber}") shm_bools[3] = True # set bool for p_blue_finished to true shm_bools[6] = False # process ended elif conditions_for_starting_processing == True: shm_bools[6] = True # process started shm_bools[9] = False # reset trigger #print(f"blue: framenumber: {framenumber}, last_processed_frame: {last_processed_frame}") shm_bools[3] = False # set bool for p_blue_finished to false do_processing() if show_opencv_window: cv.imshow("blue", shm_blueframe) cv.waitKey(1) #print(f"max frame color blue: {shm_blueframe.max()}") #print(f"processing blue finished. frame: {framenumber}") shm_bools[3] = True # set bool for p_blue_finished to true last_processed_frame = framenumber shm_bools[6] = False # process ended # elif shm_bools[0] == False: # pass # #print(f"no new blue frame") # image processing finished except KeyboardInterrupt: try: shm_bools_init.close() shm_framenumber_init.close() shm_blueframe_init.close() except FileNotFoundError: # Memory already destroyed pass # ---------------------------------------------------------------------------- # main def main(): start = time.perf_counter() try: # create processes p_red = Process(target=processing_red) p_green = Process(target=processing_green) p_blue = Process(target=processing_blue) processes = [p_red, p_green, p_blue] print(f"waiting 1 second to create processes") time.sleep(1) # sind prozesse schon vorhanden # start acitivity of processes for process in processes: process.start() # start capturing get_frames_from_picamera(shutter_speed=33333) #Can see something at normal light conditions # get_frames_from_picamera(shutter_speed=1000) print('*******************************') # this below code is only executed if the loop in take_image_picamera_opencv is breaked # In real use case there will be no end of this program # wait for all processes to finisch # main is blocked as long all processes are not finished # The processes will never finish by design (better for performance, not to check for several triggers) # for process in processes: # process.join() for process in processes: process.terminate() # print time measuring end = time.perf_counter() print(f'Script finished in {round(end-start, 2)} s') # close each SharedMemory instance and unlink to release the shared memory shm_bools_create.close() shm_bools_create.unlink() shm_framenumber_create.close() shm_framenumber_create.unlink() shm_redframe_create.close() shm_redframe_create.unlink() shm_greenframe_create.close() shm_greenframe_create.unlink() shm_blueframe_create.close() shm_blueframe_create.unlink() except KeyboardInterrupt: # Normally this prgoram never gets keyboard interrupted! But here this case is nevertheless handled # End Script try: # close each SharedMemory instance and unlink to release the shared memory shm_bools.close() shm_bools.unlink() shm_framenumber_create.close() shm_framenumber_create.unlink() shm_redframe_create.close() shm_redframe_create.unlink() shm_greenframe_create.close() shm_greenframe_create.unlink() shm_blueframe_create.close() shm_blueframe_create.unlink() except FileNotFoundError: # Memory already destroyed pass if __name__ == "__main__": main()