Developer Relations / Cubes on a conveyor belt - Larger dataset - Rebalance dataset Public

Training settings

Please provide a valid number of training cycles (numeric only)
Please provide a valid number for the learning rate (between 0 and 1)
Please provide a valid training processor option

Augmentation settings

Advanced training settings

Neural network architecture

import sys sys.path.append('./resources/libraries') import os import tensorflow as tf import numpy as np from tensorflow.keras.optimizers import Adam from tensorflow.keras.applications import MobileNetV2 from tensorflow.keras.layers import BatchNormalization, Conv2D, Softmax from tensorflow.keras.models import Model from ei_tensorflow.constrained_object_detection import dataset, metrics, util from ei_tensorflow.velo import train_keras_model_with_velo from ei_shared.pretrained_weights import get_or_download_pretrained_weights import ei_tensorflow.training WEIGHTS_PREFIX = os.environ.get('WEIGHTS_PREFIX', os.getcwd()) def build_model(input_shape: tuple, weights: str, alpha: float, num_classes_with_background: int) -> tf.keras.Model: """Construct a constrained object detection model. Args: input_shape: Passed to MobileNetV2 construction. weights: Weights for initialization of MobileNetV2 where None implies random initialization. alpha: MobileNetV2 alpha value. num_classes_with_background: Total number of classes including background. Returns: Uncompiled Keras model. Model takes (B, H, W, C) input and returns (B, H//8, W//8, num_classes_with_background) logits. """ # Create full MobileNetV2 from (HW, HW, C) input to (HW/8, HW/8, C) output mobile_net_v2 = MobileNetV2(input_shape=input_shape, weights=weights, alpha=alpha, include_top=True) # Speed up batch normalization layers for layer in mobile_net_v2.layers: if isinstance(layer, BatchNormalization): layer.momentum = 0.9 # Cut MobileNetV2 at 1/8th input resolution cut_point = mobile_net_v2.get_layer('block_6_expand_relu') # Attach a small additional head on MobileNetV2 model = Conv2D(filters=32, kernel_size=1, strides=1, activation='relu', name='head')(cut_point.output) logits = Conv2D(filters=num_classes_with_background, kernel_size=1, strides=1, activation=None, name='logits')(model) return Model(inputs=mobile_net_v2.input, outputs=logits) def construct_weighted_xent_fn_per_class(model_output_shape, class_weights): """Construct a custom weighted cross entropy function for model with per-class weights. Args: model_output_shape: Output shape of Keras model, used for masks, etc. class_weights: List of weights per class (including background class). Returns: Loss function suitable for use with Keras model. The function calculates the sigmoid cross-entropy loss for each class separately, applies the corresponding class weight, and sums the losses. """ if len(model_output_shape) != 4: raise Exception("Expected model_output_shape of form (BATCH_SIZE, H, W, NUM_CLASSES)") _batch_size, height, width, num_classes_model = model_output_shape if num_classes_model != len(class_weights): raise Exception(f"Number of class weights ({len(class_weights)}) does not match " f"number of classes ({num_classes_model})") # Convert class_weights to a tensor class_weights_tensor = tf.constant(class_weights, dtype=tf.float32) def weighted_xent(y_true, y_pred_logits): # Compute the sigmoid cross-entropy loss for each class losses = tf.nn.sigmoid_cross_entropy_with_logits(labels=y_true, logits=y_pred_logits) # Multiply each class's loss by its weight # Reshape class_weights_tensor for broadcasting: (1, 1, 1, num_classes) weights = tf.reshape(class_weights_tensor, [1, 1, 1, num_classes_model]) weighted_losses = losses * weights # Sum the losses over the classes total_loss = tf.reduce_sum(weighted_losses, axis=-1) # Shape: (batch_size, H, W) # Compute mean loss over all pixels and the batch return tf.reduce_mean(total_loss) return weighted_xent def train(num_classes: int, learning_rate: float, num_epochs: int, alpha: float, train_dataset: tf.data.Dataset, validation_dataset: tf.data.Dataset, best_model_path: str, input_shape: tuple, batch_size: int, class_weights, use_velo: bool = False, ensure_determinism: bool = False) -> tf.keras.Model: """Construct and train a constrained object detection model with per-class weighting. Args: num_classes: Number of classes excluding the background. learning_rate: Learning rate for Adam optimizer. num_epochs: Number of epochs for training. alpha: Alpha value for MobileNetV2. train_dataset: Training dataset. validation_dataset: Validation dataset. best_model_path: Path to save the best model. input_shape: Shape of the model's input. batch_size: Training batch size. class_weights: List or array of per-class weights (including background class). use_velo: Whether to use VeLO optimizer. ensure_determinism: If true, disables non-deterministic functions. Returns: Trained Keras model. """ # Initialize callbacks if not already defined global callbacks callbacks = callbacks if 'callbacks' in globals() else [] num_classes_with_background = num_classes + 1 width, height, input_num_channels = input_shape if width != height: raise Exception(f"Only square inputs are supported; not {input_shape}") # Use pretrained weights if available allowed_combinations = [{'num_channels': 1, 'alpha': 0.1}, {'num_channels': 1, 'alpha': 0.35}, {'num_channels': 3, 'alpha': 0.1}, {'num_channels': 3, 'alpha': 0.35}] weights = get_or_download_pretrained_weights( WEIGHTS_PREFIX, input_num_channels, alpha, allowed_combinations) model = build_model( input_shape=input_shape, weights=weights, alpha=alpha, num_classes_with_background=num_classes_with_background ) # Derive output size from model model_output_shape = model.layers[-1].output.shape _batch, output_width, output_height, num_classes_model = model_output_shape if output_width != output_height: raise Exception(f"Only square outputs are supported; not {model_output_shape}") # Build the custom per-class weighted loss function weighted_xent = construct_weighted_xent_fn_per_class(model_output_shape, class_weights) prefetch_policy = 1 if ensure_determinism else tf.data.experimental.AUTOTUNE # Transform bounding box labels into segmentation maps def as_segmentation(ds, shuffle): ds = ds.map(dataset.bbox_to_segmentation( output_width, num_classes_with_background)) if not ensure_determinism and shuffle: ds = ds.shuffle(buffer_size=batch_size * 4) ds = ds.batch(batch_size, drop_remainder=False).prefetch(prefetch_policy) return ds train_segmentation_dataset = as_segmentation(train_dataset, True) validation_segmentation_dataset = as_segmentation(validation_dataset, False) validation_dataset_for_callback = (validation_dataset .batch(batch_size, drop_remainder=False) .prefetch(prefetch_policy)) # Initialize biases of final classifier based on training data prior util.set_classifier_biases_from_dataset( model, train_segmentation_dataset) if not use_velo: model.compile(loss=weighted_xent, optimizer=Adam(learning_rate=learning_rate)) # Create callbacks callbacks.append(metrics.CentroidScoring(validation_dataset_for_callback, output_width, num_classes_with_background)) callbacks.append(metrics.PrintPercentageTrained(num_epochs)) # Model checkpointing based on the best validation F1 score callbacks.append( tf.keras.callbacks.ModelCheckpoint(best_model_path, monitor='val_f1', save_best_only=True, mode='max', save_weights_only=True, verbose=0)) if use_velo: from tensorflow.python.framework.errors_impl import ResourceExhaustedError try: train_keras_model_with_velo( model, train_segmentation_dataset, validation_segmentation_dataset, loss_fn=weighted_xent, num_epochs=num_epochs, callbacks=callbacks ) except ResourceExhaustedError as e: print(str(e)) raise Exception( "ResourceExhaustedError caught during train_keras_model_with_velo." " Though VeLO encourages a large batch size, the current" f" size of {batch_size} may be too large. Please try a lower" " value. For further assistance please contact support" " at https://forum.edgeimpulse.com/") else: model.fit(train_segmentation_dataset, validation_data=validation_segmentation_dataset, epochs=num_epochs, callbacks=callbacks, verbose=0) # Restore best weights model.load_weights(best_model_path) # Add explicit softmax layer before export softmax_layer = Softmax()(model.layers[-1].output) model = Model(model.input, softmax_layer) return model # Training parameters EPOCHS = args.epochs or 60 LEARNING_RATE = args.learning_rate or 0.001 BATCH_SIZE = args.batch_size or 32 # Function to compute class frequencies from the dataset and determine num_classes def compute_class_frequencies(dataset): class_counts = {} total_samples = 0 # Total number of samples (images) num_classes = None # Number of classes excluding background for sample in dataset: labels = sample[1] # labels is a tuple of two RaggedTensors bounding_boxes = labels[0] # RaggedTensor of bounding boxes class_vectors = labels[1] # RaggedTensor of class vectors (one-hot) # Get number of bounding boxes num_boxes = bounding_boxes.nrows() if num_boxes == 0: # No bounding boxes, entire image is background class_counts[0] = class_counts.get(0, 0) + 1 else: # There are bounding boxes # Convert class_vectors to tensor and then get class IDs class_vectors_tensor = class_vectors.to_tensor() if class_vectors_tensor.shape[0] > 0: if num_classes is None: num_classes = class_vectors_tensor.shape[1] class_ids = tf.argmax(class_vectors_tensor, axis=1).numpy() for class_id in class_ids: # Adjust class ID to account for background class_counts[int(class_id) + 1] = class_counts.get(int(class_id) + 1, 0) + 1 total_samples += 1 if num_classes is None: raise Exception("Could not determine number of classes from dataset.") num_classes_with_background = num_classes + 1 return class_counts, total_samples, num_classes_with_background # Compute class frequencies from the training dataset class_counts, total_samples, num_classes_with_background = compute_class_frequencies(train_dataset) num_classes = num_classes_with_background - 1 # Exclude background class # Create class frequencies list class_frequencies = [class_counts.get(i, 0) for i in range(num_classes_with_background)] total_counts = sum(class_frequencies) # Compute initial class weights inversely proportional to class frequencies initial_class_weights = [ (total_counts / (num_classes_with_background * freq)) if freq > 0 else 0.0 for freq in class_frequencies ] # Set background class weight to 1.0 initial_class_weights[0] = 1.0 # Optionally, apply an extra weight factor to other classes EXTRA_WEIGHT_FACTOR = 150 # Adjust this factor as needed class_weights = [initial_class_weights[0]] + [ w * EXTRA_WEIGHT_FACTOR for w in initial_class_weights[1:] ] print(f"Class counts: {class_counts}") print(f"Class frequencies: {class_frequencies}") print(f"Initial class weights: {initial_class_weights}") print(f"Adjusted class weights: {class_weights}") # Proceed with training model = train( num_classes=num_classes, # Excluding background class learning_rate=LEARNING_RATE, num_epochs=EPOCHS, alpha=0.35, train_dataset=train_dataset, validation_dataset=validation_dataset, best_model_path=BEST_MODEL_PATH, input_shape=MODEL_INPUT_SHAPE, batch_size=BATCH_SIZE, class_weights=class_weights, use_velo=False, ensure_determinism=ensure_determinism ) disable_per_channel_quantization = False
Input layer (49,152 features)
FOMO (Faster Objects, More Objects) MobileNetV2 0.35
Output layer (4 classes)

Model

Model version: