Commit 89a51b98 authored by Ziqin Wang's avatar Ziqin Wang Committed by Jeeken
Browse files

blue car video tested

parent 4672f1b1
Loading
Loading
Loading
Loading

src/car_target.py

100644 → 100755
+126 −62
Original line number Diff line number Diff line
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__date__ = '2018/01/22'

__date__ = '2018/04/06'

import cv2
import numpy as np
import time
import math
import numpy as np
from matplotlib import pyplot as plt

from tools import quitable, folder
from detector import BarColor, Detector
from serial_msg import SerialWriter


def get_converter(weight: int, height: int):
    def f(n: float): # an odd function
        return math.log(n + 1) if n >= 0 else math.log(1 - n)
    def converter(x: int, y: int) -> tuple: # TODO
        dx = x - weight // 2
        dy = y - height // 2
        return int(f(dx) * 8), int(f(dy) * 6)
    return converter
# TODO: (x, y) -> (yaw, pitch)
#
# def get_converter(weight: int, height: int):
#     def f(n: float): # an odd function
#         return math.log(n + 1) if n >= 0 else math.log(1 - n)
#     def converter(x: int, y: int) -> tuple: # TODO
#         dx = x - weight // 2
#         dy = y - height // 2
#         return int(f(dx) * 8), int(f(dy) * 6)
#     return converter
#
# class Tracer:
#     """
#     (x, y) -> (delta_yaw, delta_pitch)
#     and keep movement smooth
#     """
#     def __init__(self, width, height):
#         self.step = 5
#         self.central_x = width >> 1
#         self.central_y = height >> 1
#
#     def convert(self, target: tuple) -> tuple:
#         if target is not None:
#             self.step = (self.step - 1) if self.step > 5 else 5
#             x, y = target
#             dx = x - self.central_x
#             dy = y - self.central_y
#             eta = self.step / math.sqrt(dx * dx + dy * dy)
#             return (-int(dx * eta), int(dy * eta)) if eta < 1 else target
#         else:
#             self.step = self.step + 1 if self.step < 50 else 50
#             return 0, 0


class Smoother:
    def __init__(self, shape):
        width, height = shape
        self.last_target = None
        self.invalid_count = 0
        self.far_enough = math.sqrt(width*width + height*height) / 12
        self.count_threshold = 5

    def smoothed(self, new_target):
        if new_target is None:
            if self.invalid_count > self.count_threshold:
                self.last_target = None
            else:
                self.invalid_count += 1
        else:
            if self.last_target is None:
                self.last_target = new_target
                self.invalid_count = 0
            else:
                nx, ny = new_target
                lx, ly = self.last_target
                dy, dx = ny - ly, nx - lx
                dis = math.sqrt(dx*dx + dy*dy)
                if dis > self.far_enough and self.invalid_count <= self.count_threshold:
                    self.invalid_count += 1
                else: # normal, or switch to new target
                    self.last_target = lx + dx//2, ly + dy//2
                    self.invalid_count = 0
        return self.last_target


class CarTargetApp:
@@ -41,15 +94,18 @@ class ImgCarTargetApp(CarTargetApp):
        self.frame_size = frame_size

    @quitable
    def run(self): # FIXME
        file_list = folder("/home/jeeken/Projects/CV_tools/img/target", "png")
    def run(self): # FIXME: move folder path to constructor
        file_list = folder("/home/jeeken/Pictures/DMovie", "jpg")
        for i in file_list:
            print(i)
            print("img_file:", i)
            mat = cv2.resize(cv2.imread(i), self.frame_size)
            target = self.detector.target(mat) # TODO
            print(target)
            if cv2.waitKey(0) & 0xFF == ord('q'):
            target = self.detector.target(mat)
            print("target:", target)
            if plt.waitforbuttonpress():
                plt.close()
                break
            # if cv2.waitKey(0) & 0xFF == ord('q'):
            #    break


class CamCarTargetApp(CarTargetApp):
@@ -60,7 +116,7 @@ class CamCarTargetApp(CarTargetApp):
        self.cap = cv2.VideoCapture(cam_idx)
        self.msg = msg
        self.frame_size = frame_size
        self.converter = get_converter(*frame_size)
        self.smoother = Smoother(frame_size)

    def __del__(self):
        CarTargetApp.__del__(self)
@@ -84,62 +140,70 @@ class CamCarTargetApp(CarTargetApp):

    @quitable
    def run(self):
        while True:
            if self.cap.isOpened():
                success, mat = self.cap.read()
                if success:
                    mat = CamCarTargetApp.undistort(cv2.resize(mat, self.frame_size))
                else:
                    break
            else:
        # TODO: unfinished
        while self.cap.isOpened():
            ok, frame = self.cap.read()
            if not ok:
                break

            frame = CamCarTargetApp.undistort(cv2.resize(frame, self.frame_size))
            if self.debug and (cv2.waitKey(1) & 0xFF == ord('q')):
                break

            time.sleep(0.015)
            target = self.detector.target(mat)
            if target is not None:
                x, y = target
                # y -= 104 # camera coordinate -> gun coordinate (Remark: camera !// gun )
                self.msg.write(*self.converter(x, y))
            target = self.detector.target(frame)
            target_smoothed = self.smoother.smoothed(target)
            # TODO:
            # - convert coordinate
            # - communication
            # - debug helper


class VideoCarTargetApp(CarTargetApp):
    def __init__(self, color: BarColor, debug: bool = False):
    def __init__(self, color: BarColor, file: str, frame_size: tuple = (640, 480), debug: bool = False):
        CarTargetApp.__init__(self, color, debug)
        self.file = file
        self.debug = debug
        self.frame_size = frame_size
        self.smoother = Smoother(frame_size)

    @quitable
    def run(self): # FIXME
        # fourcc = cv2.VideoWriter_fourcc(*'XVID')
        # out = cv2.VideoWriter('test.avi', fourcc, 20.0, (640, 480))
        cap = cv2.VideoCapture('red_car.avi')
        skip = 0
        count = -1
        wait = 10 if skip == 0 else 0
    def run(self):
        # out = cv2.VideoWriter(filename="/home/jeeken/Videos/180_out.mp4", fourcc=cv2.VideoWriter_fourcc(*"XVID"), fps=30.0, frameSize=(640, 480))
        cap = cv2.VideoCapture(self.file)
        while cap.isOpened():
            count += 1
            ret, mat = cap.read()
            # FIXME: skip is not modified during the process, how will the process terminate?
            if skip == 0:
                print("frame " + str(count))
                target = self.detector.target(mat) # TODO
                k = cv2.waitKey(wait)
                if k & 0xFF == ord('q'):
                    # out.release()
            ok, frame = cap.read()
            if not ok:
                break
            frame = cv2.resize(frame, self.frame_size)
            target = self.detector.target(frame)
            target_smoothed = self.smoother.smoothed(target)
            print("target:  ", target)
            print("smoothed:", target_smoothed)
            print("--------------------")

            cv2.imshow("Original", frame)
            if target is not None:
                cv2.circle(img=frame, center=target, radius=4, color=(0, 255, 0), thickness=-1) # green, -1: filled
                aim_color = (0, 0, 255)
                x, y = target_smoothed
                cv2.circle(img=frame, center=target_smoothed, radius=20, color=aim_color, thickness=2) # red circle
                cv2.line(img=frame, pt1=(x-40, y), pt2=(x+40, y), color=aim_color, thickness=1)
                cv2.line(img=frame, pt1=(x, y-40), pt2=(x, y+40), color=aim_color, thickness=1)
            cv2.imshow("Aimed", frame)
            # out.write(frame)

            if cv2.waitKey(10) & 0xFF == ord('q'):
                break
                if k & 0xFF == ord('p'):
                    wait = 0
                if k & 0xFF == ord('c'):
                    wait = 10
            else:
                skip -= 1
        cap.release()


if __name__ == "__main__":
    # TODO: get argument from command line
    ser = SerialWriter(port='/dev/ttyUSB0', baudrate=115200)
    app = CamCarTargetApp(color=BarColor.BLUE, msg=ser, cam_idx=0, debug=True)
    # ser = SerialWriter(port='/dev/ttyUSB0', baudrate=115200)

    # app = CamCarTargetApp(color=BarColor.BLUE, msg=ser, cam_idx=1, frame_size=(480, 360), debug=True)

    # app = ImgCarTargetApp(color=BarColor.BLUE, frame_size=(480, 360), debug=True)
    # app = ImgCarTargetApp(color=BarColor.RED, frame_size=(480, 360), debug=True)

    # edit commented code if you want to debug Video mode
    app = VideoCarTargetApp(color=BarColor.BLUE, file="/home/jeeken/Videos/live_blue.avi", frame_size=(640, 480), debug=False)
    app.run()
+254 −17
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ import cv2
import copy
import math
import numpy as np
from matplotlib import pyplot as plt
from enum import Enum
from tools import set_mouth_callback_show_pix

@@ -24,14 +25,241 @@ class TargetDis(Enum):
    ULTRA = 3


# TODO: Detector for red light bars

class Detector:
    def __init__(self, color: BarColor, debug: bool = False):
    def __init__(self, color: BarColor, shape: tuple = (480, 360), debug: bool = False):
        self.color = color
        self.debug = debug
        self.distance = TargetDis.NEAR
        self.distance = TargetDis.NEAR # for target_old
        # self.min_step = 5 # FIXME Unknown Use # Never used before reassigned a new value
        # self.target_last = np.array([240, 180], dtype=np.int32) # suggest setting the central point as the initial value

    def hsv(self, frame):
        hsv_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)

        # TODO: decide the range of lightness based on histogram
        if self.color == BarColor.BLUE:
            lower_light = np.array([90, 0, 215])
            upper_light = np.array([120, 100, 255])
            lower_halo  = np.array([90, 100, 185])
            upper_halo  = np.array([120, 255, 255])
        else: # self.color == BarColor.RED:
            lower_light = np.array([0, 0, 215])
            upper_light = np.array([25, 100, 255])
            # TODO
            lower_halo1 = np.array([170, 100, 185])
            upper_halo1 = np.array([180, 255, 255])
            lower_halo2 = np.array([0, 100, 185])
            upper_halo2 = np.array([25, 255, 255])
            lower_halo = cv2.bitwise_or(lower_halo1, lower_halo2)
            upper_halo = cv2.bitwise_or(upper_halo1, upper_halo2)

        light_area = cv2.inRange(hsv_frame, lower_light, upper_light)
        halo_area = cv2.inRange(hsv_frame, lower_halo, upper_halo)
        return light_area, halo_area

    @staticmethod
    def get_connected_components_info(frame):
        cpn_count, label_map, cpn_info, centroids = \
            cv2.connectedComponentsWithStats(image=frame, connectivity=4, ltype=cv2.CV_32S)

        def pack(label, info, centroid):
            northwest_x, northwest_y, width, height, pixel_number = info
            x, y = centroid
            return {
                "label": label,
                "size": (width, height),
                "centroid": (int(round(x)), int(round(y))),
                "northwest": (northwest_x, northwest_y),
                "southeast": (northwest_x + width, northwest_y + height),
                "pixels": pixel_number
            }
        # background removed
        return sorted(map(pack, range(0, cpn_count), cpn_info, centroids), key=lambda x: -x["pixels"])[1:], label_map

    @staticmethod
    def zoom_and_shift(area):
        nw_x, nw_y = area["northwest"]
        se_x, se_y = area["southeast"]
        nw_x -= area["width"] // 4
        se_x += area["width"] // 4
        nw_y -= area["height"] // 2
        return {
            "northwest": (nw_x, nw_y), "southeast": (se_x, se_y),
            "width": se_x - nw_x, "height": se_y - se_x
        }

    @staticmethod
    def select_lights_in_halo(light, halo):
        light_components, light_map = Detector.get_connected_components_info(light)
        halo_components= Detector.get_connected_components_info(halo)[0]
        selected_halo = list(filter(lambda c: c["pixels"] > 20, halo_components)) # TODO: eliminate magic number

        # find halo area
        try:
            northwest_x = min([x["northwest"][0] for x in selected_halo])
            northwest_y = min([x["northwest"][1] for x in selected_halo])
            southeast_x = max([x["southeast"][0] for x in selected_halo])
            southeast_y = max([x["southeast"][1] for x in selected_halo])
        except ValueError: # selected_halo is []
            return [], None
        width = southeast_x - northwest_x
        height = southeast_y - northwest_y

        # zoom and shift
        northwest_x -= width // 4
        southeast_x += width // 4
        northwest_y -= height // 2

        # select inside ones
        def is_fully_inside(c):
            c_nw_x, c_nw_y = c["northwest"]
            c_se_x, c_se_y = c["southeast"]
            return northwest_x <= c_nw_x and northwest_y <= c_nw_y and c_se_x <= southeast_x and c_se_y <= southeast_y
        insides = list(filter(is_fully_inside, light_components)) # FIXME: why doesn't work if don't convert it to a list?

        # add angle info
        for i in insides:
            nw_x, nw_y = i["northwest"]
            se_x, se_y = i["southeast"]
            tx, ty = nw_x + np.argmax(light_map[nw_y, nw_x:se_x]), nw_y     # top peak
            bx, by = nw_x + np.argmax(light_map[se_y-1, nw_x:se_x]), se_y-1     # bottom peak
            angle = math.atan2(by-ty, tx-bx) / math.pi  # unit: pi rad
            i["angle"] = angle

        return insides, {"northwest": (northwest_x, northwest_y), "southeast": (southeast_x, southeast_y)}

    @staticmethod
    def select_valid_bars(lights):
        if lights == []:
            return []

        # eliminate small ones
        max_pixels = max([x["pixels"] for x in lights])
        big_enough = list(filter(lambda x: (x["pixels"] / max_pixels) > 0.06, lights))

        def is_bar_near(bar):
            width, height = bar["size"]
            square_ratio = height / width
            return 1.5 <= square_ratio < 8
        # def is_bar_far(bar):
        #     width, height = bar["size"]
        #     square_ratio = height / width
        #     return 1.3 < square_ratio < 4
        selected = list(filter(is_bar_near, big_enough))
        # if len(selected) < 2:
        #     selected = list(filter(is_bar_far, big_enough))

        vertical_bars = filter(lambda x: abs(x["angle"] - 0.5) < 0.2, selected)

        # filled_ratio test for removing components in strange shape
        def filled_ratio(bar):
            width, height = bar["size"]
            return -bar["pixels"] / (width * height)
        return sorted(vertical_bars, key=filled_ratio)[0:6]

    @staticmethod
    def select_pair(bars):
        length = len(bars)
        if length < 2:
            return ()

        pairs = [(i, j) for i in range(0, length-1) for j in range(i+1, length)]

        def y_dis(bar1, bar2):
            y1 = bar1["centroid"][1]
            y2 = bar2["centroid"][1]
            height_base = min((bar1["size"][1], bar2["size"][1]))
            return abs(y1 - y2) / height_base

        def area(bar1, bar2):
            a1 = bar1["pixels"]
            a2 = bar2["pixels"]
            return abs(a1 - a2) / min((a1, a2))

        def parallel(bar1, bar2):
            theta1, theta2 = bar1["angle"], bar2["angle"]
            return abs(theta1 - theta2) * 5

        def judge(pair):
            index1, index2 = pair
            bar1, bar2 = bars[index1], bars[index2]
            policy = [(y_dis, 0.6), (area, 0.3), (parallel, 1.2)]
            # # debug
            # print(pair, 'y_dis:', y_dis(bar1, bar2), '\tarea:', area(bar1, bar2), '\tparallel:', parallel(bar1, bar2))
            return sum([func(bar1, bar2) * coefficient for func, coefficient in policy])

        judging_result = [{"indices": pair, "score": judge(pair)} for pair in pairs]
        winner = min(judging_result, key=lambda x: x["score"])

        # # debug
        # print("winner:", winner)
        if winner["score"] > 1.2:
            return ()

        i1, i2 = winner["indices"]
        return bars[i1], bars[i2]

    def target(self, frame):
        # frame = cv2.blur(frame, ksize=(4, 4))
        frame = cv2.pyrUp(cv2.pyrDown(frame)) # down-sample
        light, halo = self.hsv(frame)
        lights_in_halo, selected_area = Detector.select_lights_in_halo(light, halo)
        selected_bars = Detector.select_valid_bars(lights_in_halo)
        selected_pair = Detector.select_pair(selected_bars)
        if selected_pair != ():
            bar1, bar2 = selected_pair
            def mid_point(pt1, pt2):
                x1, y1 = pt1
                x2, y2 = pt2
                return (x1 + x2) // 2, (y1 + y2) // 2
            target = mid_point(bar1["centroid"], bar2["centroid"])
        else:
            target = None

        # # debug
        # for each in selected_pair:
        #     cv2.circle(img=frame, center=each["centroid"], radius=7, color=(0,255,0), thickness=2)
        # cv2.imshow('Debug', frame)

        if self.debug:
            show = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            plt.figure("Car Detection Debug")

            plt.subplot(2, 2, 1)
            plt.title("Original")
            plt.imshow(show)
            plt.axis('off')

            if selected_area is not None:
                cv2.rectangle(halo, pt1=selected_area["northwest"], pt2=selected_area["southeast"], color=255,
                              thickness=1)
                cv2.rectangle(light, pt1=selected_area["northwest"], pt2=selected_area["southeast"], color=255,
                              thickness=1)
            for each in selected_bars:
                cv2.circle(img=show, center=each["centroid"], radius=7, color=(0, 255, 0), thickness=2)
            for each in selected_pair:
                cv2.circle(img=show, center=each["centroid"], radius=5, color=(255, 0, 0), thickness=2)

            plt.subplot(2, 2, 2)
            plt.title("halo")
            plt.imshow(halo)
            plt.axis('off')

            plt.subplot(2, 2, 3)
            plt.title("light")
            plt.imshow(light)
            plt.axis('off')

            plt.subplot(2, 2, 4)
            plt.title("selected")
            plt.imshow(show)
            plt.axis('off')

        return target


    def track(self, mat, color_threshold, square_ratio, angle_rate, length_rate, matrix_threshold):
        """
@@ -57,7 +285,6 @@ class Detector:
        :param angle_rate: bar match angle rate factor
        :param length_rate: bar match length rate factor
        :param matrix_threshold: match pairs by rate score > matrix_threshold
        :param DIST: NEAR or FAR, not use
        :return: None when no target, (x, y, pixel_count) for target
        """
        # FIXME
@@ -79,9 +306,9 @@ class Detector:

        # Label Connected Component
        connect_output = cv2.connectedComponentsWithStats(b, 4, cv2.CV_32S)
        # connect_output is a tuple - (int component count, ndarray label map, ndarray connect component info, ndarray unused not clear)
        # ndarray label map - use int to label connect component, same int value means one component, size equal to "b"
        # connect component info - a n * 5 ndarray to show connect component info, [left_top_x, left_top_y, width, height, pixel number]
        # connect_output: tuple (int component_count, ndarray label_map, ndarray connect_component_info, ndarray unused_not_clear)
        # label_map: use int to label connect component, same int value means one component, size equal to "b"
        # connect_component_info: a n*5 ndarray showing connect component info, [left_top_x, left_top_y, width, height, pixel_number]

        # Delete Component according to Height / Width >= 3
        connect_label = connect_output[1]
@@ -93,7 +320,7 @@ class Detector:
        if self.debug:
            print("connected components num: " + str(len(connect_output[2])))

            print("square_scale: " + str(connect_data[:, 3] / connect_data[:, 2]))
            # print("square_scale: " + str(connect_data[:, 3] / connect_data[:, 2]))
            connect_max_index = np.argmax(connect_data[:, 4])
            connect_label_show = copy.deepcopy(connect_label).astype(np.uint8)
            if connect_max_index != 0:
@@ -163,6 +390,7 @@ class Detector:
        # for i in range(len(connect_data)):
        #     if i not in connect_delete_list:
        #         connect_remain.append(connect_data[i])
        # TODO: consider whether we can improve the algorithm ( O(n^2) -> O(nlogn) )
        for each in connect_data:
            if each not in connect_delete_list:
                connect_remain.append(each)
@@ -181,7 +409,8 @@ class Detector:
            top_y = each[1] # top y coordinate of connecetd components
            top_x_series = np.where(connect_label[top_y+1, each[0]:each[0]+each[2]] != 0)[0]
            # locate a sereis of x coordinate of the light bar
            # Unsure: why first y, then x ? why only keep index[0] ?
            # Unsure: why first y, then x ?
            # Why only keep index[0]? Because np.where(condition) returns a tuple which has only one element
            if len(top_x_series) == 0:
                return None
            n1 = int((np.max(top_x_series) + np.min(top_x_series)) / 2 + each[0]) # x coordinate of mid point of top line of light bar
@@ -390,16 +619,20 @@ class Detector:
        # return [target_x, target_y, bar_peak_point[max_j][2]]
        # Why np.array?

    def target(self, mat):
    def target_old(self, mat):
        if mat is None:
            return None
        t = None
        self.distance = TargetDis.NEAR
        if self.color == BarColor.BLUE:
            t = self.track(mat, 80, 2.5, 0.5, 1, 13)
            if len(t) == 0:
                t = self.track(mat, 80, 0.8, 0.5, 1, 13)
            # t = self.track(mat, 80, 2.5, 0.5, 1, 13)
            t = self.track(mat, 55, 2.5, 0.5, 1, 13)
            if t is None:
                t = self.track(mat, 55, 0.8, 0.5, 1, 13)
                # t = self.track(mat, 80, 0.8, 0.5, 1, 13)
        elif self.color == BarColor.RED:
            t = self.track(mat, 50, 2, 0.5, 1, 13)
            if len(t) == 0:
            if t is None:
                t = self.track(mat, 50, 0.5, 0.3, 0.6, 13)

        if self.debug and t is not None:
@@ -412,10 +645,14 @@ class Detector:
            cv2.circle(marked, (x, y), 20, line_color, 2, 15)
            cv2.line(marked, (x - 40, y), (x + 40, y), line_color, 1)
            cv2.line(marked, (x, y - 40), (x, y + 40), line_color, 1)
            cv2.imshow('target_img', marked)
            set_mouth_callback_show_pix("target_img", marked)
            cv2.imshow('raw_img', marked)
            set_mouth_callback_show_pix("raw_img", marked)

        return t[0], t[1] if t is not None else None
        if t is not None:
            return t[0], t[1]
        else:
            return None
        # return t[0], t[1] if t is not None else None

    # from datetime import datetime
    # def trace(self, mat):
+3 −2
Original line number Diff line number Diff line
@@ -9,17 +9,18 @@ class SerialWriter:
        self.ser.close()

    @staticmethod
    def _to_int16(n):
    def _to_int16(n: int) -> int:
        if n < 0:
            return 65536 + n
        else:
            return n

    def write(self, delta_yaw, delta_pitch):
    def write(self, delta_yaw: int, delta_pitch: int):
        """
        Protocol:
        0xFA + length(2 bytes) + d_yaw(2 bytes) + d_pitch(2 bytes) + 0x00(reserved) + 0xFB
        """

        delta_yaw = SerialWriter._to_int16(delta_yaw)
        delta_pitch = SerialWriter._to_int16(delta_pitch)