A.I
RetinaNet으로 자율주행 시스템 만들기 본문
RetinaNet으로 자율주행시스템 만들어보기¶
- mkdir -p ~/aiffel/object_detection/data
- pip uninstall tensorflow
- pip install tensorflow==2.3.0
- cd ~/aiffel/object_detection
- git clone https://github.com/fizyr/keras-retinanet.git
- cd keras-retinanet && python setup.py build_ext --inplace
- pip install tensorflow_datasets tqdm
- pip install -r requirements.txt
- pip install .
In [1]:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices(device_type="GPU")
tf.config.experimental.set_visible_devices(devices=gpus[0], device_type="GPU")
tf.config.experimental.set_memory_growth(device=gpus[0], enable=True)
In [ ]:
import tensorflow as tf
from tensorflow.python.client import device_lib
device_lib.list_local_devices()
자율주행 보조장치 (1) KITTI 데이터셋¶
- KITTI 데이터셋
#### 요구사항
- 사람이 카메라에 감지되면 정지
- 차량이 일정 크기 이상으로 감지되면 정지
In [3]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras
# Helper libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import tensorflow_datasets as tfds
import copy
import cv2
from PIL import Image, ImageDraw
In [ ]:
# 다운로드에 매우 긴 시간이 소요됩니다.
import urllib3
urllib3.disable_warnings()
(ds_train, ds_test), ds_info = tfds.load(
'kitti',
split=['train', 'test'],
shuffle_files=True,
with_info=True,
)
In [13]:
import os
ds_train = os.getenv('HOME')+'/tensorflow_datasets/kitti'
ds_test = os.getenv('HOME')+'/tensorflow_datasets/kitti'
ds_info = os.getenv('HOME')+'/tensorflow_datasets/kitti/3.2.0/dataset_info.json'
In [ ]:
fig = tfds.show_examples(ds_train, ds_info)
In [ ]:
ds_info
자율주행 보조장치 (2) 데이터 직접 확인하기¶
In [ ]:
TakeDataset = ds_train.take(1)
In [ ]:
for example in TakeDataset:
print('------Example------')
print(list(example.keys())) # example is `{'image': tf.Tensor, 'label': tf.Tensor}`
image = example["image"]
filename = example["image/file_name"].numpy().decode('utf-8')
objects = example["objects"]
print('------objects------')
print(objects)
img = Image.fromarray(image.numpy())
img
In [ ]:
def visualize_bbox(input_image, object_bbox):
input_image = copy.deepcopy(input_image)
draw = ImageDraw.Draw(input_image)
# 바운딩 박스 좌표(x_min, x_max, y_min, y_max) 구하기
width, height = img.size
print('width:', width, ' height:', height)
print(object_bbox.shape)
x_min = object_bbox[:,1] * width
x_max = object_bbox[:,3] * width
y_min = height - object_bbox[:,0] * height
y_max = height - object_bbox[:,2] * height
# 바운딩 박스 그리기
rects = np.stack([x_min, y_min, x_max, y_max], axis=1)
for _rect in rects:
print(_rect)
draw.rectangle(_rect, outline=(255,0,0), width=2)
print(input_image)
return input_image
visualize_bbox(img, objects['bbox'].numpy())
keras-retinanet 실습 (1) 데이터 포맷 변경¶
클래스 및 바운딩 박스 정보 추출¶
In [ ]:
import os
data_dir = os.getenv('HOME')+'/aiffel/object_detection/data'
img_dir = os.getenv('HOME')+'/kitti_images'
train_csv_path = data_dir + '/kitti_train.csv'
# parse_dataset 함수를 구현해 주세요.
def parse_dataset(dataset, img_dir="kitti_images", total=0):
if not os.path.exists(img_dir):
os.mkdir(img_dir)
# Dataset의 claas를 확인하여 class에 따른 index를 확인해둡니다.
# 저는 기존의 class를 차와 사람으로 나누었습니다.
type_class_map = {
0: "car",
1: "car",
2: "car",
3: "person",
4: "person",
5: "person",
}
# Keras retinanet을 학습하기 위한 dataset을 csv로 parsing하기 위해서 필요한 column을 가진 pandas.DataFrame을 생성합니다.
df = pd.DataFrame(columns=["img_path", "x1", "y1", "x2", "y2", "class_name"])
for item in tqdm(dataset, total=total):
filename = item['image/file_name'].numpy().decode('utf-8')
img_path = os.path.join(img_dir, filename)
img = Image.fromarray(item['image'].numpy())
img.save(img_path)
object_bbox = item['objects']['bbox']
object_type = item['objects']['type'].numpy()
width, height = img.size
# tf.dataset의 bbox좌표가 0과 1사이로 normalize된 좌표이므로 이를 pixel좌표로 변환합니다.
x_min = object_bbox[:,1] * width
x_max = object_bbox[:,3] * width
y_min = height - object_bbox[:,2] * height
y_max = height - object_bbox[:,0] * height
# 한 이미지에 있는 여러 Object들을 한 줄씩 pandas.DataFrame에 append합니다.
rects = np.stack([x_min, y_min, x_max, y_max], axis=1).astype(np.int)
for i, _rect in enumerate(rects):
_type = object_type[i]
if _type not in type_class_map.keys():
continue
df = df.append({
"img_path": img_path,
"x1": _rect[0],
"y1": _rect[1],
"x2": _rect[2],
"y2": _rect[3],
"class_name": type_class_map[_type]
}, ignore_index=True)
break
return df
df_train = parse_dataset(ds_train, img_dir, total=ds_info.splits['train'].num_examples)
df_train.to_csv(train_csv_path, sep=',',index = False, header=False)
In [ ]:
test_csv_path = data_dir + '/kitti_test.csv'
df_test = parse_dataset(ds_test, img_dir, total=ds_info.splits['test'].num_examples)
df_test.to_csv(test_csv_path, sep=',',index = False, header=False)
클래스 맵핑¶
데이터셋에서 클래스는 문자열(string)으로 표시되지만, 모델에게 데이터를 알려줄 때에는 숫자를 사용해 클래스를 표시해야함
이때 모두 어떤 클래스가 있고 각 클래스가 어떤 인덱스(index)에 맵핑(mapping)될지 미리 정하고 저장해 두어야 학습을 한 후 추론(inference)을 할 때에도 숫자 인덱스로 나온 정보를 클래스 이름으로 바꾸어 해석할 수 있습니다.
In [11]:
class_txt_path = data_dir + '/classes.txt'
def save_class_format(path="./classes.txt"):
class_type_map = {
"car" : 0,
"person": 1
}
with open(path, mode='w', encoding='utf-8') as f:
for k, v in class_type_map.items():
f.write(f"{k},{v}\n")
save_class_format(class_txt_path)
keras-retinanet 실습 (2) 셋팅¶
In [ ]:
# RetinaNet 훈련이 시작됩니다!! 50epoch 훈련에 1시간 이상 소요될 수 있습니다.
!cd ~/aiffel/object_detection && python keras-retinanet/keras_retinanet/bin/train.py --gpu 0 --multiprocessing --workers 4 --batch-size 2 --epochs 50 --steps 195 csv data/kitti_train.csv data/classes.txt
In [ ]:
!cd ~/aiffel/object_detection && python keras-retinanet/keras_retinanet/bin/convert_model.py snapshots/resnet50_csv_50.h5 snapshots/resnet50_csv_50_infer.h5
keras-retinanet 실습 (3) 시각화¶
- mkdir -p ~/aiffel/object_detection/test_set
- wget https://aiffelstaticprd.blob.core.windows.net/media/documents/test_set.zip
- mv test_set.zip ~/aiffel/object_detection/test_set
- cd ~/aiffel/object_detection/test_set && unzip test_set.zip
In [15]:
%matplotlib inline
# automatically reload modules when they have changed
%load_ext autoreload
%autoreload 2
# import keras
import keras
# import keras_retinanet
from keras_retinanet import models
from keras_retinanet.models import load_model
from keras_retinanet.utils.image import read_image_bgr, preprocess_image, resize_image
from keras_retinanet.utils.visualization import draw_box, draw_caption
from keras_retinanet.utils.colors import label_color
from keras_retinanet.utils.gpu import setup_gpu
# import miscellaneous modules
import matplotlib.pyplot as plt
import cv2
import os
import numpy as np
import time
gpu = '0'
setup_gpu(gpu)
dir_path = os.getenv('HOME') + '/aiffel/object_detection/'
model_path = os.path.join(dir_path, 'snapshots', 'resnet50_csv_50_infer.h5')
model = load_model(model_path, backbone_name='resnet50')
1 Physical GPUs, 1 Logical GPUs WARNING:tensorflow:No training configuration found in the save file, so the model was *not* compiled. Compile it manually.
In [20]:
import os
img_path = os.getenv('HOME')+'/aiffel/object_detection/test_set/go_1.png'
# inference_on_image 함수를 구현해 주세요.
def inference_on_image(model, img_path="./test_set/go_1.png", visualize=True):
image = read_image_bgr(img_path)
# copy to draw on
draw = image.copy()
draw = cv2.cvtColor(draw, cv2.COLOR_BGR2RGB)
color_map = {
0: (0, 0, 255), # blue
1: (255, 0, 0) # red
}
# preprocess image for network
image = preprocess_image(image)
image, scale = resize_image(image)
# process image
boxes, scores, labels = model.predict_on_batch(np.expand_dims(image, axis=0))
# correct for image scale
boxes /= scale
# display images
if visualize:
for box, score, label in zip(boxes[0], scores[0], labels[0]):
print(box)
if score < 0.5:
break
b = box.astype(int)
draw_box(draw, b, color=color_map[label])
caption = "{:.3f}".format(score)
draw_caption(draw, b, caption)
plt.figure(figsize=(15, 15))
plt.axis('off')
plt.imshow(draw)
plt.show()
inference_on_image(model, img_path=img_path)
[593.74084 175.72835 625.83417 207.32893] [190.743 134.69888 350.0294 236.34258]
In [17]:
img_path = os.getenv('HOME')+'/aiffel/object_detection/test_set/stop_1.png'
inference_on_image(model, img_path=img_path)
[773.2131 148.97542 844.1949 319.3991 ] [739.4071 174.49208 854.0888 297.52234]
In [28]:
def self_drive_assist(model, img_path, size_limit=300, visualize=True):
result = "Go"
image = read_image_bgr(img_path)
draw = image.copy()
draw = cv2.cvtColor(draw, cv2.COLOR_BGR2RGB)
color_map = {
0:(0,0,255),
1:(255,0,0)
}
# 이미지 전처리
image = preprocess_image(image)
image, scale = resize_image(image)
# peocess image
boxes, scores, labels = model.predict_on_batch(np.expand_dims(image, axis=0))
# correct for image scale
boxes /= scale
# display images
if visualize:
for box, score, label in zip(boxes[0], scores[0], labels[0]):
print(box)
if score<0.5:
break
b = box.astype(int)
if b[2]-b[0] >= 300 or b[3]-b[1] >= 300 or label == 1 :
result = 'Stop'
else:
result = 'Go'
draw_box(draw, b, color=color_map[label])
caption = "{:3f}".format(score)
draw_caption(draw, b, caption)
plt.figure(figsize=(15, 15))
plt.axis('off')
plt.title(result, fontsize=20)
plt.imshow(draw)
plt.show()
return result
In [29]:
import os
def test_system(func):
work_dir = os.getenv('HOME')+'/aiffel/object_detection'
score = 0
test_set=[
("test_set/stop_1.png", "Stop"),
("test_set/stop_2.png", "Stop"),
("test_set/stop_3.png", "Stop"),
("test_set/stop_4.png", "Stop"),
("test_set/stop_5.png", "Stop"),
("test_set/go_1.png", "Go"),
("test_set/go_2.png", "Go"),
("test_set/go_3.png", "Go"),
("test_set/go_4.png", "Go"),
("test_set/go_5.png", "Go"),
]
for image_file, answer in test_set:
image_path = work_dir + '/' + image_file
print(image_path)
pred = self_drive_assist(model=model, img_path=image_path, visualize=True)
if pred == answer:
score += 10
print(f"{score}점입니다.")
In [ ]:
test_system(self_drive_assist)
정리¶
- 테스트 결과 80점으로 인식가능했다
'Going Deeper' 카테고리의 다른 글
U-Net으로 시맨틱 세그멘테이션을 이용해 도로찾기 (2) | 2021.04.16 |
---|---|
Segmentation (0) | 2021.04.15 |
Object detection (0) | 2021.04.12 |
Data Augmentation (0) | 2021.04.12 |
GAN을 이용한 augmentation 기법 논문 (0) | 2021.04.08 |