diff --git a/.gitignore b/.gitignore index 0b3abcd..eafeb72 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,9 @@ install_manifest.txt compile_commands.json CTestTestfile.cmake _deps -CMakeUserPresets.json \ No newline at end of file +CMakeUserPresets.json +src/assets/* +src/__pycache__/* +src/best_lane_segmentation.pth +env/* +lane_segmentation.pth \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8d10435 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +torch +torchvision +timm +matplotlib +pandas +numpy +opencv-python \ No newline at end of file diff --git a/src/.keep b/src/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/src/Dataset.py b/src/Dataset.py new file mode 100644 index 0000000..ee907b3 --- /dev/null +++ b/src/Dataset.py @@ -0,0 +1,92 @@ +import os +import json +import cv2 +import numpy as np +import torch +import torchvision.transforms as transforms +from torch.utils.data import Dataset + +def get_binary_labels(height, width, pts, thickness=5): + bin_img = np.zeros(shape=[height, width], dtype=np.uint8) + for lane in pts: + cv2.polylines( + bin_img, + np.int32([lane]), + isClosed=False, + color=1, + thickness=thickness) + + return bin_img.astype(np.float32)[None, ...] + +def get_image_transform(): + normalizer = transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + + t = [transforms.ToTensor(), + normalizer] + + transform = transforms.Compose(t) + return transform + +class TuSimpleDataset(Dataset): + def __init__(self, json_paths, img_dir, width=512, height=256, + thickness=5): + self.samples = [] + self.width = width + self.height = height + self.thickness = thickness + self.img_dir = img_dir + self.transform = get_image_transform() + + for json_path in json_paths: + with open(json_path, 'r') as f: + for line in f: + self.samples.append(json.loads(line)) + + def __len__(self): + return len(self.samples) + + def __getitem__(self, idx): + info = self.samples[idx] + file_path = os.path.join(self.img_dir, info['raw_file']) + + # Read and resize image + image = cv2.imread(file_path) + if image is None: + raise ValueError(f"Could not read image: {file_path}") + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + + width_org = image.shape[1] + height_org = image.shape[0] + image = cv2.resize(image, (self.width, self.height)) + + # Process lane points + x_lanes = info['lanes'] + y_samples = info['h_samples'] + + # Create points list with list comprehension + pts = [ + [(x, y) for (x, y) in zip(lane, y_samples) if x >= 0] + for lane in x_lanes + ] + + # Remove empty lanes + pts = [l for l in pts if len(l) > 0] + + # Calculate scaling rates + x_rate = 1.0 * self.width / width_org + y_rate = 1.0 * self.height / height_org + + # Scale points + pts = [[(int(round(x*x_rate)), int(round(y*y_rate))) + for (x, y) in lane] for lane in pts] + + # Generate labels + bin_labels = get_binary_labels(self.height, self.width, pts, + thickness=self.thickness) + + image = self.transform(image) + + bin_labels = torch.Tensor(bin_labels) + + return image, bin_labels \ No newline at end of file diff --git a/src/Model.py b/src/Model.py new file mode 100644 index 0000000..359a4af --- /dev/null +++ b/src/Model.py @@ -0,0 +1,14 @@ +import torch +import torch.nn as nn +import torchvision.models.segmentation as models +from torchvision.models.segmentation import DeepLabV3_ResNet50_Weights + +class LaneSegmentationModel(nn.Module): + def __init__(self, num_classes=1): + super(LaneSegmentationModel, self).__init__() + weights = DeepLabV3_ResNet50_Weights.DEFAULT + self.model = models.deeplabv3_resnet50(weights=weights) + self.model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1) + + def forward(self, x): + return self.model(x)['out'] \ No newline at end of file diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..8ca134e --- /dev/null +++ b/src/main.py @@ -0,0 +1,130 @@ +import cv2 +import numpy as np +import torch +import glob +from Model import LaneSegmentationModel +from Dataset import get_image_transform +from utils import visualize_output_batch +from PIL import Image +from torchvision import transforms + + +class PostProcessor(object): + + def __init__(self): + pass + + def process(self, image, kernel_size=5, minarea_threshold=10): + """Do the post processing here. First the image is converte to grayscale. + Then a closing operation is applied to fill empty gaps among surrounding + pixels. After that connected component are detected where small components + will be removed. + + Args: + image: + kernel_size + minarea_threshold + + Returns: + image: binary image + + """ + if image.dtype is not np.uint8: + image = np.array(image, np.uint8) + if len(image.shape) == 3: + image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + + # fill the pixel gap using Closing operator (dilation followed by + # erosion) + kernel = cv2.getStructuringElement( + shape=cv2.MORPH_RECT, ksize=( + kernel_size, kernel_size)) + image = cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel) + + ccs = cv2.connectedComponentsWithStats( + image, connectivity=8, ltype=cv2.CV_32S) + labels = ccs[1] + stats = ccs[2] + + for index, stat in enumerate(stats): + if stat[4] <= minarea_threshold: + idx = np.where(labels == index) + image[idx] = 0 + + return image + + +# Initialize model +if torch.cuda.is_available(): + device = torch.device("cuda") + torch.backends.cudnn.benchmark = True # Optimize CUDA performance + print(f"Using CUDA device: {torch.cuda.get_device_name()}") +elif torch.backends.mps.is_available(): # For Apple Silicon + device = torch.device("mps") + print("Using MPS (Metal Performance Shaders)") +else: + device = torch.device("cpu") + print("Using CPU") +model = LaneSegmentationModel().to(device) +checkpoint = torch.load("best_lane_segmentation.pth") +model.load_state_dict(checkpoint['model_state_dict']) +model.eval() + +dummy_input = torch.randn(1, 3, 256, 256).to(device) +torch.onnx.export( + model, + dummy_input, + "lane_segmentation.onnx", + export_params=True, + opset_version=11, + do_constant_folding=True, + input_names=['input'], + output_names=['output'], + dynamic_axes={ + 'input': {0: 'batch_size'}, + 'output': {0: 'batch_size'} + } +) +print("Model exported to ONNX format!") + +cap = cv2.VideoCapture("assets/road1.mp4") +post_processor = PostProcessor() + +while True: + ret, frame = cap.read() + if not ret: + break + + rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + transforms = get_image_transform() + input_tensor = transforms(rgb_frame).unsqueeze(0).to(device) + + # Run inference. + with torch.no_grad(): + output = model(input_tensor) + + output_mask = output.squeeze().cpu().numpy() + binary_mask = (output_mask > 0.6).astype(np.uint8) * 255 + + processed_mask = post_processor.process( + binary_mask, + kernel_size=5, + minarea_threshold=10 + ) + + # Resize the mask to the original frame dimensions. + mask_resized = cv2.resize(processed_mask, (frame.shape[1], frame.shape[0])) + + # Create a copy of the original frame. + overlay = frame.copy() + overlay[mask_resized > 0] = [0, 255, 0] + alpha = 0.4 # Transparency factor + blended = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) + + # Display the result. + cv2.imshow("Lane Detection", blended) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + +cv2.destroyAllWindows() \ No newline at end of file diff --git a/src/training.py b/src/training.py new file mode 100644 index 0000000..e963592 --- /dev/null +++ b/src/training.py @@ -0,0 +1,86 @@ +import torch +import torch.nn as nn +import torch +import torch.optim as optim +from torch.utils.data import DataLoader +from torchvision import transforms +from Model import LaneSegmentationModel +from Dataset import TuSimpleDataset +from utils import visualize_batch + +dataset = TuSimpleDataset( + json_paths=[ + "/home/luis_t2/OpenCV/assets/TUSimple/train_set/label_data_0313.json", + "/home/luis_t2/OpenCV/assets/TUSimple/train_set/label_data_0531.json", + "/home/luis_t2/OpenCV/assets/TUSimple/train_set/label_data_0601.json" + ], + img_dir="/home/luis_t2/OpenCV/assets/TUSimple/train_set/", +) + +dataloader = DataLoader( + dataset, + batch_size=8, + shuffle=True +) + +if torch.cuda.is_available(): + device = torch.device("cuda") + torch.backends.cudnn.benchmark = True # Optimize CUDA performance + print(f"Using CUDA device: {torch.cuda.get_device_name()}") +elif torch.backends.mps.is_available(): # For Apple Silicon + device = torch.device("mps") + print("Using MPS (Metal Performance Shaders)") +else: + device = torch.device("cpu") + print("Using CPU") +model = LaneSegmentationModel().to(device) + +criterion = nn.BCEWithLogitsLoss() +optimizer = optim.Adam(model.parameters(), lr=0.001) + +if __name__ == "__main__": + num_epochs = 20 + batch_size = 8 + best_loss = float('inf') + best_model_path = "best_lane_segmentation.pth" + + for epoch in range(num_epochs): + model.train() + epoch_loss = 0.0 + batch_count = 0 + + for batch_idx, (images, masks) in enumerate(dataloader): + images = images.to(device) + masks = masks.to(device) + + optimizer.zero_grad() + outputs = model(images) + loss = criterion(outputs, masks) + loss.backward() + optimizer.step() + + epoch_loss += loss.item() + batch_count += 1 + + if (batch_idx + 1) % 5 == 0: + # visualize_batch(images, masks, outputs) + print(f"Epoch [{epoch+1}/{num_epochs}], " + f"Loss: {loss.item():.4f}") + + # Calculate average loss for the epoch + avg_epoch_loss = epoch_loss / batch_count + print(f"Epoch [{epoch+1}/{num_epochs}] Average Loss: {avg_epoch_loss:.4f}") + + # Save the model if it has the best loss so far + if avg_epoch_loss < best_loss: + best_loss = avg_epoch_loss + torch.save({ + 'epoch': epoch, + 'model_state_dict': model.state_dict(), + 'optimizer_state_dict': optimizer.state_dict(), + 'loss': best_loss, + }, best_model_path) + print(f"Saved new best model with loss: {best_loss:.4f}") + + print("Training finished!") + print(f"Best model saved with loss: {best_loss:.4f}") \ No newline at end of file diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..4cd493a --- /dev/null +++ b/src/utils.py @@ -0,0 +1,72 @@ +import torch +import matplotlib.pyplot as plt +import numpy as np + +def visualize_batch(images, masks=None, outputs=None): + img = images[0].cpu().permute(1, 2, 0).numpy() + + # Denormalize image + mean = np.array([0.485, 0.456, 0.406]) + std = np.array([0.229, 0.224, 0.225]) + img = std * img + mean + img = np.clip(img, 0, 1) + + # Create figure + plt.figure(figsize=(15, 5)) + + # Plot original image + plt.subplot(131) + plt.imshow(img) + plt.title('Input Image') + plt.axis('off') + + # Plot ground truth mask (single channel) + if masks is not None: + mask = masks[0].cpu().numpy() + plt.subplot(132) + plt.imshow(mask[0], cmap='gray') + plt.title('Ground Truth') + plt.axis('off') + + # Plot prediction if available + if outputs is not None: + pred = torch.sigmoid(outputs[0]).cpu().detach().numpy() + plt.subplot(133) + plt.imshow(pred[0], cmap='jet') + plt.title('Prediction') + plt.axis('off') + + plt.tight_layout() + plt.show() + plt.pause(0.1) + + +def visualize_output_batch(image, outputs=None): + img = image[0].cpu().permute(1, 2, 0).numpy() + + # Denormalize image + mean = np.array([0.485, 0.456, 0.406]) + std = np.array([0.229, 0.224, 0.225]) + img = std * img + mean + img = np.clip(img, 0, 1) + + # Create figure + plt.figure(figsize=(15, 5)) + + # Plot original image + plt.subplot(131) + plt.imshow(img) + plt.title('Input Image') + plt.axis('off') + + # Plot prediction if available + if outputs is not None: + pred = torch.sigmoid(outputs[0]).cpu().detach().numpy() # Changed to sigmoid for single channel + plt.subplot(132) + plt.imshow(pred[0], cmap='jet') + plt.title('Prediction') + plt.axis('off') + + plt.tight_layout() + plt.show() + plt.pause(0.1) \ No newline at end of file diff --git a/tests/logic/.keep b/tests/logic/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/ui/.keep b/tests/ui/.keep deleted file mode 100644 index e69de29..0000000