Skip to content
This repository was archived by the owner on Oct 6, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ install_manifest.txt
compile_commands.json
CTestTestfile.cmake
_deps
CMakeUserPresets.json
CMakeUserPresets.json
src/assets/*
src/__pycache__/*
src/best_lane_segmentation.pth
env/*
lane_segmentation.pth
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
torch
torchvision
timm
matplotlib
pandas
numpy
opencv-python
Empty file removed src/.keep
Empty file.
92 changes: 92 additions & 0 deletions src/Dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import os
import json
import cv2
import numpy as np
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset

def get_binary_labels(height, width, pts, thickness=5):
bin_img = np.zeros(shape=[height, width], dtype=np.uint8)
for lane in pts:
cv2.polylines(
bin_img,
np.int32([lane]),
isClosed=False,
color=1,
thickness=thickness)

return bin_img.astype(np.float32)[None, ...]

def get_image_transform():
normalizer = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])

t = [transforms.ToTensor(),
normalizer]

transform = transforms.Compose(t)
return transform

class TuSimpleDataset(Dataset):
def __init__(self, json_paths, img_dir, width=512, height=256,
thickness=5):
self.samples = []
self.width = width
self.height = height
self.thickness = thickness
self.img_dir = img_dir
self.transform = get_image_transform()

for json_path in json_paths:
with open(json_path, 'r') as f:
for line in f:
self.samples.append(json.loads(line))

def __len__(self):
return len(self.samples)

def __getitem__(self, idx):
info = self.samples[idx]
file_path = os.path.join(self.img_dir, info['raw_file'])

# Read and resize image
image = cv2.imread(file_path)
if image is None:
raise ValueError(f"Could not read image: {file_path}")
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

width_org = image.shape[1]
height_org = image.shape[0]
image = cv2.resize(image, (self.width, self.height))

# Process lane points
x_lanes = info['lanes']
y_samples = info['h_samples']

# Create points list with list comprehension
pts = [
[(x, y) for (x, y) in zip(lane, y_samples) if x >= 0]
for lane in x_lanes
]

# Remove empty lanes
pts = [l for l in pts if len(l) > 0]

# Calculate scaling rates
x_rate = 1.0 * self.width / width_org
y_rate = 1.0 * self.height / height_org

# Scale points
pts = [[(int(round(x*x_rate)), int(round(y*y_rate)))
for (x, y) in lane] for lane in pts]

# Generate labels
bin_labels = get_binary_labels(self.height, self.width, pts,
thickness=self.thickness)

image = self.transform(image)

bin_labels = torch.Tensor(bin_labels)

return image, bin_labels
14 changes: 14 additions & 0 deletions src/Model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import torch
import torch.nn as nn
import torchvision.models.segmentation as models
from torchvision.models.segmentation import DeepLabV3_ResNet50_Weights

class LaneSegmentationModel(nn.Module):
def __init__(self, num_classes=1):
super(LaneSegmentationModel, self).__init__()
weights = DeepLabV3_ResNet50_Weights.DEFAULT
self.model = models.deeplabv3_resnet50(weights=weights)
self.model.classifier[4] = nn.Conv2d(256, num_classes, kernel_size=1)

def forward(self, x):
return self.model(x)['out']
130 changes: 130 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import cv2
import numpy as np
import torch
import glob
from Model import LaneSegmentationModel
from Dataset import get_image_transform
from utils import visualize_output_batch
from PIL import Image
from torchvision import transforms


class PostProcessor(object):

def __init__(self):
pass

def process(self, image, kernel_size=5, minarea_threshold=10):
"""Do the post processing here. First the image is converte to grayscale.
Then a closing operation is applied to fill empty gaps among surrounding
pixels. After that connected component are detected where small components
will be removed.

Args:
image:
kernel_size
minarea_threshold

Returns:
image: binary image

"""
if image.dtype is not np.uint8:
image = np.array(image, np.uint8)
if len(image.shape) == 3:
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

# fill the pixel gap using Closing operator (dilation followed by
# erosion)
kernel = cv2.getStructuringElement(
shape=cv2.MORPH_RECT, ksize=(
kernel_size, kernel_size))
image = cv2.morphologyEx(image, cv2.MORPH_CLOSE, kernel)

ccs = cv2.connectedComponentsWithStats(
image, connectivity=8, ltype=cv2.CV_32S)
labels = ccs[1]
stats = ccs[2]

for index, stat in enumerate(stats):
if stat[4] <= minarea_threshold:
idx = np.where(labels == index)
image[idx] = 0

return image


# Initialize model
if torch.cuda.is_available():
device = torch.device("cuda")
torch.backends.cudnn.benchmark = True # Optimize CUDA performance
print(f"Using CUDA device: {torch.cuda.get_device_name()}")
elif torch.backends.mps.is_available(): # For Apple Silicon
device = torch.device("mps")
print("Using MPS (Metal Performance Shaders)")
else:
device = torch.device("cpu")
print("Using CPU")
model = LaneSegmentationModel().to(device)
checkpoint = torch.load("best_lane_segmentation.pth")
model.load_state_dict(checkpoint['model_state_dict'])
model.eval()

dummy_input = torch.randn(1, 3, 256, 256).to(device)
torch.onnx.export(
model,
dummy_input,
"lane_segmentation.onnx",
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={
'input': {0: 'batch_size'},
'output': {0: 'batch_size'}
}
)
print("Model exported to ONNX format!")

cap = cv2.VideoCapture("assets/road1.mp4")
post_processor = PostProcessor()

while True:
ret, frame = cap.read()
if not ret:
break

rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

transforms = get_image_transform()
input_tensor = transforms(rgb_frame).unsqueeze(0).to(device)

# Run inference.
with torch.no_grad():
output = model(input_tensor)

output_mask = output.squeeze().cpu().numpy()
binary_mask = (output_mask > 0.6).astype(np.uint8) * 255

processed_mask = post_processor.process(
binary_mask,
kernel_size=5,
minarea_threshold=10
)

# Resize the mask to the original frame dimensions.
mask_resized = cv2.resize(processed_mask, (frame.shape[1], frame.shape[0]))

# Create a copy of the original frame.
overlay = frame.copy()
overlay[mask_resized > 0] = [0, 255, 0]
alpha = 0.4 # Transparency factor
blended = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0)

# Display the result.
cv2.imshow("Lane Detection", blended)
if cv2.waitKey(1) & 0xFF == ord('q'):
break

cv2.destroyAllWindows()
86 changes: 86 additions & 0 deletions src/training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import torch
import torch.nn as nn
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from Model import LaneSegmentationModel
from Dataset import TuSimpleDataset
from utils import visualize_batch

dataset = TuSimpleDataset(
json_paths=[
"/home/luis_t2/OpenCV/assets/TUSimple/train_set/label_data_0313.json",
"/home/luis_t2/OpenCV/assets/TUSimple/train_set/label_data_0531.json",
"/home/luis_t2/OpenCV/assets/TUSimple/train_set/label_data_0601.json"
],
img_dir="/home/luis_t2/OpenCV/assets/TUSimple/train_set/",
)

dataloader = DataLoader(
dataset,
batch_size=8,
shuffle=True
)

if torch.cuda.is_available():
device = torch.device("cuda")
torch.backends.cudnn.benchmark = True # Optimize CUDA performance
print(f"Using CUDA device: {torch.cuda.get_device_name()}")
elif torch.backends.mps.is_available(): # For Apple Silicon
device = torch.device("mps")
print("Using MPS (Metal Performance Shaders)")
else:
device = torch.device("cpu")
print("Using CPU")
model = LaneSegmentationModel().to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

if __name__ == "__main__":
num_epochs = 20
batch_size = 8
best_loss = float('inf')
best_model_path = "best_lane_segmentation.pth"

for epoch in range(num_epochs):
model.train()
epoch_loss = 0.0
batch_count = 0

for batch_idx, (images, masks) in enumerate(dataloader):
images = images.to(device)
masks = masks.to(device)

optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, masks)
loss.backward()
optimizer.step()

epoch_loss += loss.item()
batch_count += 1

if (batch_idx + 1) % 5 == 0:
# visualize_batch(images, masks, outputs)
print(f"Epoch [{epoch+1}/{num_epochs}], "
f"Loss: {loss.item():.4f}")

# Calculate average loss for the epoch
avg_epoch_loss = epoch_loss / batch_count
print(f"Epoch [{epoch+1}/{num_epochs}] Average Loss: {avg_epoch_loss:.4f}")

# Save the model if it has the best loss so far
if avg_epoch_loss < best_loss:
best_loss = avg_epoch_loss
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': best_loss,
}, best_model_path)
print(f"Saved new best model with loss: {best_loss:.4f}")

print("Training finished!")
print(f"Best model saved with loss: {best_loss:.4f}")
Loading
Loading