Training settings
Please provide a valid training processor option
Neural network architecture
import sys
sys.path.append('./resources/libraries')
import os
import tensorflow as tf
import numpy as np
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import BatchNormalization, Conv2D, Softmax
from tensorflow.keras.models import Model
from ei_tensorflow.constrained_object_detection import dataset, metrics, util
from ei_tensorflow.velo import train_keras_model_with_velo
from ei_shared.pretrained_weights import get_or_download_pretrained_weights
import ei_tensorflow.training
WEIGHTS_PREFIX = os.environ.get('WEIGHTS_PREFIX', os.getcwd())
def build_model(input_shape: tuple, weights: str, alpha: float,
num_classes_with_background: int) -> tf.keras.Model:
"""Construct a constrained object detection model.
Args:
input_shape: Passed to MobileNetV2 construction.
weights: Weights for initialization of MobileNetV2 where None implies
random initialization.
alpha: MobileNetV2 alpha value.
num_classes_with_background: Total number of classes including background.
Returns:
Uncompiled Keras model.
Model takes (B, H, W, C) input and
returns (B, H//8, W//8, num_classes_with_background) logits.
"""
# Create full MobileNetV2 from (HW, HW, C) input to (HW/8, HW/8, C) output
mobile_net_v2 = MobileNetV2(input_shape=input_shape,
weights=weights,
alpha=alpha,
include_top=True)
# Speed up batch normalization layers
for layer in mobile_net_v2.layers:
if isinstance(layer, BatchNormalization):
layer.momentum = 0.9
# Cut MobileNetV2 at 1/8th input resolution
cut_point = mobile_net_v2.get_layer('block_6_expand_relu')
# Attach a small additional head on MobileNetV2
model = Conv2D(filters=32, kernel_size=1, strides=1,
activation='relu', name='head')(cut_point.output)
logits = Conv2D(filters=num_classes_with_background, kernel_size=1, strides=1,
activation=None, name='logits')(model)
return Model(inputs=mobile_net_v2.input, outputs=logits)
def construct_weighted_xent_fn_per_class(model_output_shape, class_weights):
"""Construct a custom weighted cross entropy function for model with per-class weights.
Args:
model_output_shape: Output shape of Keras model, used for masks, etc.
class_weights: List of weights per class (including background class).
Returns:
Loss function suitable for use with Keras model.
The function calculates the sigmoid cross-entropy loss for each class separately,
applies the corresponding class weight, and sums the losses.
"""
if len(model_output_shape) != 4:
raise Exception("Expected model_output_shape of form (BATCH_SIZE, H, W, NUM_CLASSES)")
_batch_size, height, width, num_classes_model = model_output_shape
if num_classes_model != len(class_weights):
raise Exception(f"Number of class weights ({len(class_weights)}) does not match "
f"number of classes ({num_classes_model})")
# Convert class_weights to a tensor
class_weights_tensor = tf.constant(class_weights, dtype=tf.float32)
def weighted_xent(y_true, y_pred_logits):
# Compute the sigmoid cross-entropy loss for each class
losses = tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred_logits)
# Multiply each class's loss by its weight
# Reshape class_weights_tensor for broadcasting: (1, 1, 1, num_classes)
weights = tf.reshape(class_weights_tensor, [1, 1, 1, num_classes_model])
weighted_losses = losses * weights
# Sum the losses over the classes
total_loss = tf.reduce_sum(weighted_losses, axis=-1) # Shape: (batch_size, H, W)
# Compute mean loss over all pixels and the batch
return tf.reduce_mean(total_loss)
return weighted_xent
def train(num_classes: int, learning_rate: float, num_epochs: int,
alpha: float,
train_dataset: tf.data.Dataset,
validation_dataset: tf.data.Dataset,
best_model_path: str,
input_shape: tuple,
batch_size: int,
class_weights,
use_velo: bool = False,
ensure_determinism: bool = False) -> tf.keras.Model:
"""Construct and train a constrained object detection model with per-class weighting.
Args:
num_classes: Number of classes excluding the background.
learning_rate: Learning rate for Adam optimizer.
num_epochs: Number of epochs for training.
alpha: Alpha value for MobileNetV2.
train_dataset: Training dataset.
validation_dataset: Validation dataset.
best_model_path: Path to save the best model.
input_shape: Shape of the model's input.
batch_size: Training batch size.
class_weights: List or array of per-class weights (including background class).
use_velo: Whether to use VeLO optimizer.
ensure_determinism: If true, disables non-deterministic functions.
Returns:
Trained Keras model.
"""
# Initialize callbacks if not already defined
global callbacks
callbacks = callbacks if 'callbacks' in globals() else []
num_classes_with_background = num_classes + 1
width, height, input_num_channels = input_shape
if width != height:
raise Exception(f"Only square inputs are supported; not {input_shape}")
# Use pretrained weights if available
allowed_combinations = [{'num_channels': 1, 'alpha': 0.1},
{'num_channels': 1, 'alpha': 0.35},
{'num_channels': 3, 'alpha': 0.1},
{'num_channels': 3, 'alpha': 0.35}]
weights = get_or_download_pretrained_weights(
WEIGHTS_PREFIX, input_num_channels, alpha, allowed_combinations)
model = build_model(
input_shape=input_shape,
weights=weights,
alpha=alpha,
num_classes_with_background=num_classes_with_background
)
# Derive output size from model
model_output_shape = model.layers[-1].output.shape
_batch, output_width, output_height, num_classes_model = model_output_shape
if output_width != output_height:
raise Exception(f"Only square outputs are supported; not {model_output_shape}")
# Build the custom per-class weighted loss function
weighted_xent = construct_weighted_xent_fn_per_class(model_output_shape, class_weights)
prefetch_policy = 1 if ensure_determinism else tf.data.experimental.AUTOTUNE
# Transform bounding box labels into segmentation maps
def as_segmentation(ds, shuffle):
ds = ds.map(dataset.bbox_to_segmentation(
output_width, num_classes_with_background))
if not ensure_determinism and shuffle:
ds = ds.shuffle(buffer_size=batch_size * 4)
ds = ds.batch(batch_size, drop_remainder=False).prefetch(prefetch_policy)
return ds
train_segmentation_dataset = as_segmentation(train_dataset, True)
validation_segmentation_dataset = as_segmentation(validation_dataset, False)
validation_dataset_for_callback = (validation_dataset
.batch(batch_size, drop_remainder=False)
.prefetch(prefetch_policy))
# Initialize biases of final classifier based on training data prior
util.set_classifier_biases_from_dataset(
model, train_segmentation_dataset)
if not use_velo:
model.compile(loss=weighted_xent,
optimizer=Adam(learning_rate=learning_rate))
# Create callbacks
callbacks.append(metrics.CentroidScoring(validation_dataset_for_callback,
output_width, num_classes_with_background))
callbacks.append(metrics.PrintPercentageTrained(num_epochs))
# Model checkpointing based on the best validation F1 score
callbacks.append(
tf.keras.callbacks.ModelCheckpoint(best_model_path,
monitor='val_f1', save_best_only=True, mode='max',
save_weights_only=True, verbose=0))
if use_velo:
from tensorflow.python.framework.errors_impl import ResourceExhaustedError
try:
train_keras_model_with_velo(
model,
train_segmentation_dataset,
validation_segmentation_dataset,
loss_fn=weighted_xent,
num_epochs=num_epochs,
callbacks=callbacks
)
except ResourceExhaustedError as e:
print(str(e))
raise Exception(
"ResourceExhaustedError caught during train_keras_model_with_velo."
" Though VeLO encourages a large batch size, the current"
f" size of {batch_size} may be too large. Please try a lower"
" value. For further assistance please contact support"
" at https://forum.edgeimpulse.com/")
else:
model.fit(train_segmentation_dataset,
validation_data=validation_segmentation_dataset,
epochs=num_epochs, callbacks=callbacks, verbose=0)
# Restore best weights
model.load_weights(best_model_path)
# Add explicit softmax layer before export
softmax_layer = Softmax()(model.layers[-1].output)
model = Model(model.input, softmax_layer)
return model
# Training parameters
EPOCHS = args.epochs or 60
LEARNING_RATE = args.learning_rate or 0.001
BATCH_SIZE = args.batch_size or 32
# Function to compute class frequencies from the dataset and determine num_classes
def compute_class_frequencies(dataset):
class_counts = {}
total_samples = 0 # Total number of samples (images)
num_classes = None # Number of classes excluding background
for sample in dataset:
labels = sample[1] # labels is a tuple of two RaggedTensors
bounding_boxes = labels[0] # RaggedTensor of bounding boxes
class_vectors = labels[1] # RaggedTensor of class vectors (one-hot)
# Get number of bounding boxes
num_boxes = bounding_boxes.nrows()
if num_boxes == 0:
# No bounding boxes, entire image is background
class_counts[0] = class_counts.get(0, 0) + 1
else:
# There are bounding boxes
# Convert class_vectors to tensor and then get class IDs
class_vectors_tensor = class_vectors.to_tensor()
if class_vectors_tensor.shape[0] > 0:
if num_classes is None:
num_classes = class_vectors_tensor.shape[1]
class_ids = tf.argmax(class_vectors_tensor, axis=1).numpy()
for class_id in class_ids:
# Adjust class ID to account for background
class_counts[int(class_id) + 1] = class_counts.get(int(class_id) + 1, 0) + 1
total_samples += 1
if num_classes is None:
raise Exception("Could not determine number of classes from dataset.")
num_classes_with_background = num_classes + 1
return class_counts, total_samples, num_classes_with_background
# Compute class frequencies from the training dataset
class_counts, total_samples, num_classes_with_background = compute_class_frequencies(train_dataset)
num_classes = num_classes_with_background - 1 # Exclude background class
# Create class frequencies list
class_frequencies = [class_counts.get(i, 0) for i in range(num_classes_with_background)]
total_counts = sum(class_frequencies)
# Compute initial class weights inversely proportional to class frequencies
initial_class_weights = [
(total_counts / (num_classes_with_background * freq)) if freq > 0 else 0.0
for freq in class_frequencies
]
# Set background class weight to 1.0
initial_class_weights[0] = 1.0
# Optionally, apply an extra weight factor to other classes
EXTRA_WEIGHT_FACTOR = 150 # Adjust this factor as needed
class_weights = [initial_class_weights[0]] + [
w * EXTRA_WEIGHT_FACTOR for w in initial_class_weights[1:]
]
print(f"Class counts: {class_counts}")
print(f"Class frequencies: {class_frequencies}")
print(f"Initial class weights: {initial_class_weights}")
print(f"Adjusted class weights: {class_weights}")
# Proceed with training
model = train(
num_classes=num_classes, # Excluding background class
learning_rate=LEARNING_RATE,
num_epochs=EPOCHS,
alpha=0.35,
train_dataset=train_dataset,
validation_dataset=validation_dataset,
best_model_path=BEST_MODEL_PATH,
input_shape=MODEL_INPUT_SHAPE,
batch_size=BATCH_SIZE,
class_weights=class_weights,
use_velo=False,
ensure_determinism=ensure_determinism
)
disable_per_channel_quantization = False
Input layer (49,152 features)
FOMO (Faster Objects, More Objects) MobileNetV2 0.35
Output layer (4 classes)
Model
Model version: