Advanced Computer Vision with TensorFlow - Building Real-World Image Recognition Systems
Computer vision has revolutionized industries from healthcare to autonomous vehicles. With TensorFlow, building sophisticated image recognition systems has become more accessible than ever. This comprehensive guide will take you beyond basic image classification to advanced computer vision techniques used in production systems.
What We’ll Build
In this tutorial, you’ll learn to create:
- Custom Image Classifiers using transfer learning
- Object Detection Systems for real-time applications
- Image Segmentation Models for pixel-level analysis
- Style Transfer Networks for artistic applications
- Production-Ready CV Pipelines with optimization and deployment
Prerequisites
Before diving in, ensure you have:
- TensorFlow 2.x installed (see our getting started guide)
- Basic CNN knowledge from the previous tutorial
- 8GB+ RAM (16GB recommended for large models)
- GPU support (highly recommended for training)
Advanced CNN Architectures
1. Understanding Modern CNN Architectures
Let’s explore the evolution of CNN architectures:
import tensorflow as tffrom tensorflow import kerasfrom tensorflow.keras import layersimport matplotlib.pyplot as pltimport numpy as np
# ResNet Block - Solving the vanishing gradient problemdef residual_block(x, filters, kernel_size=3, stride=1, conv_shortcut=True): """A residual block with skip connections"""
if conv_shortcut: shortcut = layers.Conv2D(4 * filters, 1, strides=stride)(x) shortcut = layers.BatchNormalization()(shortcut) else: shortcut = x
x = layers.Conv2D(filters, 1, strides=stride)(x) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x)
x = layers.Conv2D(filters, kernel_size, padding='SAME')(x) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x)
x = layers.Conv2D(4 * filters, 1)(x) x = layers.BatchNormalization()(x)
x = layers.Add()([shortcut, x]) x = layers.Activation('relu')(x) return x
# Attention Mechanism for CNNdef attention_block(x, filters): """Squeeze-and-Excitation attention block"""
# Global average pooling gap = layers.GlobalAveragePooling2D()(x)
# Fully connected layers fc1 = layers.Dense(filters // 16, activation='relu')(gap) fc2 = layers.Dense(filters, activation='sigmoid')(fc1)
# Reshape and multiply attention = layers.Reshape((1, 1, filters))(fc2) x = layers.Multiply()([x, attention])
return x
# Modern CNN with attentiondef create_advanced_cnn(input_shape, num_classes): inputs = keras.Input(shape=input_shape)
# Initial convolution x = layers.Conv2D(64, 7, strides=2, padding='SAME')(inputs) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x) x = layers.MaxPooling2D(3, strides=2, padding='SAME')(x)
# Residual blocks with increasing filters for filters in [64, 128, 256, 512]: x = residual_block(x, filters) x = attention_block(x, filters * 4)
# Classification head x = layers.GlobalAveragePooling2D()(x) x = layers.Dropout(0.5)(x) outputs = layers.Dense(num_classes, activation='softmax')(x)
return keras.Model(inputs, outputs)
# Example usagemodel = create_advanced_cnn((224, 224, 3), 1000)print(f"Model parameters: {model.count_params():,}")2. EfficientNet - Optimized Architecture
# Using pre-trained EfficientNetdef create_efficientnet_model(num_classes, input_shape=(224, 224, 3)): base_model = keras.applications.EfficientNetB0( weights='imagenet', include_top=False, input_shape=input_shape )
# Freeze base model initially base_model.trainable = False
model = keras.Sequential([ base_model, layers.GlobalAveragePooling2D(), layers.BatchNormalization(), layers.Dropout(0.2), layers.Dense(num_classes, activation='softmax') ])
return model, base_model
# Fine-tuning strategydef setup_fine_tuning(model, base_model, learning_rate=1e-5): # Unfreeze the top layers of the base model base_model.trainable = True
# Fine-tune from this layer onwards fine_tune_at = len(base_model.layers) - 20
# Freeze all the layers before the `fine_tune_at` layer for layer in base_model.layers[:fine_tune_at]: layer.trainable = False
# Use a lower learning rate for fine-tuning model.compile( optimizer=keras.optimizers.Adam(learning_rate/10), loss='categorical_crossentropy', metrics=['accuracy'] )
return modelTransfer Learning Mastery
1. Custom Dataset Preparation
import osfrom pathlib import Pathimport PIL.Image
def create_dataset_from_directory(data_dir, image_size=(224, 224), batch_size=32): """Create a tf.data dataset from directory structure"""
# Create dataset from directory dataset = keras.utils.image_dataset_from_directory( data_dir, validation_split=0.2, subset="training", seed=123, image_size=image_size, batch_size=batch_size )
val_dataset = keras.utils.image_dataset_from_directory( data_dir, validation_split=0.2, subset="validation", seed=123, image_size=image_size, batch_size=batch_size )
return dataset, val_dataset
# Advanced data augmentation pipelinedef create_augmentation_pipeline(): """Create sophisticated data augmentation"""
data_augmentation = keras.Sequential([ layers.RandomFlip("horizontal"), layers.RandomRotation(0.1), layers.RandomZoom(0.1), layers.RandomContrast(0.1), layers.RandomBrightness(0.1), # Custom augmentation layers.Lambda(lambda x: tf.image.random_hue(x, 0.02)), layers.Lambda(lambda x: tf.image.random_saturation(x, 0.7, 1.3)), ])
return data_augmentation
# Preprocessing pipelinedef preprocess_dataset(dataset, augment=True): """Optimize dataset for training"""
# Normalization (for pre-trained models) normalization_layer = keras.utils.get_file( 'imagenet_mean.txt', 'https://github.com/tensorflow/models/raw/master/research/deeplab/datasets/remove_gt_colormap.py' )
AUTOTUNE = tf.data.AUTOTUNE
if augment: augmentation = create_augmentation_pipeline() dataset = dataset.map( lambda x, y: (augmentation(x, training=True), y), num_parallel_calls=AUTOTUNE )
# Normalize pixel values to [0,1] dataset = dataset.map( lambda x, y: (tf.cast(x, tf.float32) / 255.0, y), num_parallel_calls=AUTOTUNE )
# Optimize performance dataset = dataset.cache() dataset = dataset.shuffle(1000) dataset = dataset.prefetch(AUTOTUNE)
return dataset
# Complete transfer learning pipelinedef train_custom_classifier(data_dir, num_classes, epochs=20): # Load and preprocess data train_ds, val_ds = create_dataset_from_directory(data_dir) train_ds = preprocess_dataset(train_ds, augment=True) val_ds = preprocess_dataset(val_ds, augment=False)
# Create model model, base_model = create_efficientnet_model(num_classes)
# Initial training (frozen base) model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'] )
# Callbacks callbacks = [ keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True), keras.callbacks.ReduceLROnPlateau(factor=0.2, patience=3), keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True) ]
# Train frozen model print("Training with frozen base model...") history1 = model.fit( train_ds, epochs=epochs//2, validation_data=val_ds, callbacks=callbacks )
# Fine-tuning print("Fine-tuning model...") model = setup_fine_tuning(model, base_model)
history2 = model.fit( train_ds, epochs=epochs//2, validation_data=val_ds, callbacks=callbacks )
return model, history1, history22. Advanced Transfer Learning Techniques
# Multi-scale feature extractiondef create_multiscale_model(base_model, num_classes): """Extract features from multiple layers"""
# Get intermediate layer outputs layer_names = [ 'block4a_expand_activation', # 28x28 'block6a_expand_activation', # 14x14 'top_activation' # 7x7 ]
layers_outputs = [base_model.get_layer(name).output for name in layer_names]
# Create feature extraction model feature_extractor = keras.Model( inputs=base_model.input, outputs=layers_outputs )
# Multi-scale processing inputs = keras.Input(shape=(224, 224, 3)) features = feature_extractor(inputs)
# Process each scale processed_features = [] for i, feature in enumerate(features): # Global average pooling for each scale gap = layers.GlobalAveragePooling2D()(feature) dense = layers.Dense(256, activation='relu')(gap) processed_features.append(dense)
# Concatenate multi-scale features combined = layers.Concatenate()(processed_features) combined = layers.Dropout(0.5)(combined) outputs = layers.Dense(num_classes, activation='softmax')(combined)
return keras.Model(inputs, outputs)
# Domain adaptation techniquesdef create_domain_adaptive_model(source_model, target_classes): """Adapt model from source domain to target domain"""
# Remove the last classification layer base_features = source_model.layers[-2].output
# Add domain classifier (for adversarial training) domain_classifier = layers.Dense(1, activation='sigmoid', name='domain')(base_features)
# Add new task classifier task_classifier = layers.Dense(target_classes, activation='softmax', name='task')(base_features)
# Create multi-output model adapted_model = keras.Model( inputs=source_model.input, outputs=[task_classifier, domain_classifier] )
return adapted_modelObject Detection with TensorFlow
1. YOLO-style Object Detection
# Custom YOLO implementationdef create_yolo_model(input_shape, num_classes, num_anchors=3): """Simplified YOLO architecture"""
inputs = keras.Input(shape=input_shape)
# Backbone (feature extractor) x = layers.Conv2D(32, 3, padding='same', activation='relu')(inputs) x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x) x = layers.MaxPooling2D()(x)
x = layers.Conv2D(128, 3, padding='same', activation='relu')(x) x = layers.MaxPooling2D()(x)
x = layers.Conv2D(256, 3, padding='same', activation='relu')(x) x = layers.MaxPooling2D()(x)
x = layers.Conv2D(512, 3, padding='same', activation='relu')(x)
# Detection head # Output: (batch, grid_h, grid_w, anchors * (5 + num_classes)) # 5 = x, y, w, h, confidence outputs = layers.Conv2D( num_anchors * (5 + num_classes), 1, activation='linear' )(x)
return keras.Model(inputs, outputs)
# YOLO loss functiondef yolo_loss(y_true, y_pred, num_classes=80, num_anchors=3): """YOLO loss function"""
# Reshape predictions grid_h, grid_w = tf.shape(y_pred)[1], tf.shape(y_pred)[2] y_pred = tf.reshape(y_pred, (-1, grid_h, grid_w, num_anchors, 5 + num_classes))
# Split predictions pred_xy = tf.sigmoid(y_pred[..., :2]) # Center coordinates pred_wh = y_pred[..., 2:4] # Width and height pred_conf = tf.sigmoid(y_pred[..., 4]) # Confidence pred_class = y_pred[..., 5:] # Class probabilities
# Split ground truth true_xy = y_true[..., :2] true_wh = y_true[..., 2:4] true_conf = y_true[..., 4] true_class = y_true[..., 5:]
# Calculate losses xy_loss = tf.reduce_sum(tf.square(true_xy - pred_xy)) * true_conf wh_loss = tf.reduce_sum(tf.square(tf.sqrt(true_wh) - tf.sqrt(pred_wh))) * true_conf conf_loss = tf.reduce_sum(tf.square(true_conf - pred_conf)) class_loss = tf.reduce_sum(tf.square(true_class - pred_class)) * true_conf
total_loss = xy_loss + wh_loss + conf_loss + class_loss return total_loss
# Non-Maximum Suppressiondef non_max_suppression(boxes, scores, max_outputs=50, iou_threshold=0.5): """Apply NMS to filter overlapping boxes"""
selected_indices = tf.image.non_max_suppression( boxes, scores, max_outputs, iou_threshold )
selected_boxes = tf.gather(boxes, selected_indices) selected_scores = tf.gather(scores, selected_indices)
return selected_boxes, selected_scores, selected_indices2. Using TensorFlow Object Detection API
# Install TensorFlow Object Detection API# !pip install tensorflow-object-detection-api
import tensorflow_hub as hub
def load_detector(model_url): """Load pre-trained object detection model""" detector = hub.load(model_url) return detector
def detect_objects(detector, image_path, min_score=0.3): """Detect objects in an image"""
# Load and preprocess image image = tf.io.read_file(image_path) image = tf.image.decode_image(image, channels=3) image = tf.image.convert_image_dtype(image, tf.float32) image = image[tf.newaxis, ...]
# Run detection results = detector(image)
# Filter by confidence score scores = results['detection_scores'][0].numpy() boxes = results['detection_boxes'][0].numpy() classes = results['detection_class_entities'][0].numpy()
# Filter detections valid_detections = scores >= min_score
return { 'boxes': boxes[valid_detections], 'scores': scores[valid_detections], 'classes': classes[valid_detections] }
# Real-time object detectiondef real_time_detection(detector, camera_index=0): """Real-time object detection from webcam"""
import cv2
cap = cv2.VideoCapture(camera_index)
while True: ret, frame = cap.read() if not ret: break
# Convert BGR to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Prepare for detection input_tensor = tf.convert_to_tensor(rgb_frame) input_tensor = input_tensor[tf.newaxis, ...] input_tensor = tf.cast(input_tensor, tf.float32) / 255.0
# Detect objects detections = detector(input_tensor)
# Draw bounding boxes frame = draw_detections(frame, detections)
cv2.imshow('Object Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'): break
cap.release() cv2.destroyAllWindows()
def draw_detections(image, detections, min_score=0.3): """Draw bounding boxes and labels on image"""
import cv2
h, w, _ = image.shape scores = detections['detection_scores'][0].numpy() boxes = detections['detection_boxes'][0].numpy() classes = detections['detection_class_entities'][0].numpy()
for i in range(len(scores)): if scores[i] >= min_score: # Convert normalized coordinates to pixel coordinates y1, x1, y2, x2 = boxes[i] x1, y1, x2, y2 = int(x1*w), int(y1*h), int(x2*w), int(y2*h)
# Draw bounding box cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label label = f"{classes[i].decode('utf-8')}: {scores[i]:.2f}" cv2.putText(image, label, (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return imageImage Segmentation
1. U-Net for Semantic Segmentation
def conv_block(x, filters, kernel_size=3): """Convolutional block for U-Net""" x = layers.Conv2D(filters, kernel_size, padding='same')(x) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x)
x = layers.Conv2D(filters, kernel_size, padding='same')(x) x = layers.BatchNormalization()(x) x = layers.Activation('relu')(x)
return x
def create_unet(input_shape, num_classes): """U-Net architecture for segmentation"""
inputs = keras.Input(shape=input_shape)
# Encoder (downsampling) conv1 = conv_block(inputs, 64) pool1 = layers.MaxPooling2D()(conv1)
conv2 = conv_block(pool1, 128) pool2 = layers.MaxPooling2D()(conv2)
conv3 = conv_block(pool2, 256) pool3 = layers.MaxPooling2D()(conv3)
conv4 = conv_block(pool3, 512) pool4 = layers.MaxPooling2D()(conv4)
# Bottleneck conv5 = conv_block(pool4, 1024)
# Decoder (upsampling) up6 = layers.Conv2DTranspose(512, 2, strides=2, padding='same')(conv5) up6 = layers.Concatenate()([up6, conv4]) conv6 = conv_block(up6, 512)
up7 = layers.Conv2DTranspose(256, 2, strides=2, padding='same')(conv6) up7 = layers.Concatenate()([up7, conv3]) conv7 = conv_block(up7, 256)
up8 = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(conv7) up8 = layers.Concatenate()([up8, conv2]) conv8 = conv_block(up8, 128)
up9 = layers.Conv2DTranspose(64, 2, strides=2, padding='same')(conv8) up9 = layers.Concatenate()([up9, conv1]) conv9 = conv_block(up9, 64)
# Output layer outputs = layers.Conv2D(num_classes, 1, activation='softmax')(conv9)
return keras.Model(inputs, outputs)
# Dice loss for segmentationdef dice_loss(y_true, y_pred, smooth=1e-6): """Dice loss function for segmentation"""
y_true_f = tf.keras.backend.flatten(y_true) y_pred_f = tf.keras.backend.flatten(y_pred)
intersection = tf.keras.backend.sum(y_true_f * y_pred_f) dice = (2. * intersection + smooth) / ( tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth )
return 1 - dice
# IoU metric for segmentationdef iou_metric(y_true, y_pred, num_classes): """Intersection over Union metric"""
ious = [] for cls in range(num_classes): y_true_cls = tf.equal(y_true, cls) y_pred_cls = tf.equal(tf.argmax(y_pred, axis=-1), cls)
intersection = tf.reduce_sum(tf.cast(y_true_cls & y_pred_cls, tf.float32)) union = tf.reduce_sum(tf.cast(y_true_cls | y_pred_cls, tf.float32))
iou = intersection / (union + 1e-10) ious.append(iou)
return tf.reduce_mean(ious)2. Instance Segmentation with Mask R-CNN
# Using TensorFlow Hub for Mask R-CNNdef load_mask_rcnn(): """Load pre-trained Mask R-CNN model"""
model_url = "https://tfhub.dev/tensorflow/mask_rcnn/inception_resnet_v2_1024x1024/1" model = hub.load(model_url)
return model
def instance_segmentation(model, image_path): """Perform instance segmentation"""
# Load and preprocess image image = tf.io.read_file(image_path) image = tf.image.decode_image(image, channels=3) image = tf.image.convert_image_dtype(image, tf.float32) image = tf.expand_dims(image, 0)
# Run inference results = model(image)
return { 'detection_boxes': results['detection_boxes'][0].numpy(), 'detection_classes': results['detection_classes'][0].numpy().astype(int), 'detection_scores': results['detection_scores'][0].numpy(), 'detection_masks': results['detection_masks'][0].numpy() }
def visualize_instance_segmentation(image, results, min_score=0.3): """Visualize instance segmentation results"""
import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import matplotlib.patches as patches
fig, ax = plt.subplots(1, figsize=(12, 8)) ax.imshow(image)
boxes = results['detection_boxes'] classes = results['detection_classes'] scores = results['detection_scores'] masks = results['detection_masks']
colors = plt.cm.Set3(np.linspace(0, 1, len(boxes)))
for i, (box, cls, score, mask) in enumerate(zip(boxes, classes, scores, masks)): if score >= min_score: # Draw bounding box y1, x1, y2, x2 = box h, w = image.shape[:2] x1, y1, x2, y2 = x1*w, y1*h, x2*w, y2*h
rect = Rectangle((x1, y1), x2-x1, y2-y1, linewidth=2, edgecolor=colors[i], facecolor='none') ax.add_patch(rect)
# Draw mask mask_resized = tf.image.resize(mask[..., None], [h, w]) mask_resized = tf.squeeze(mask_resized) > 0.5
colored_mask = np.zeros((h, w, 4)) colored_mask[..., :3] = colors[i][:3] colored_mask[..., 3] = mask_resized * 0.5
ax.imshow(colored_mask)
# Add label ax.text(x1, y1-10, f'Class {cls}: {score:.2f}', bbox=dict(facecolor=colors[i], alpha=0.8))
ax.axis('off') plt.title('Instance Segmentation Results') plt.show()Style Transfer and GANs
1. Neural Style Transfer
def load_style_transfer_models(): """Load pre-trained style transfer models"""
# VGG19 for feature extraction vgg = keras.applications.VGG19(include_top=False, weights='imagenet') vgg.trainable = False
# Layers for content and style representation content_layers = ['block5_conv2'] style_layers = ['block1_conv1', 'block2_conv1', 'block3_conv1', 'block4_conv1', 'block5_conv1']
return vgg, content_layers, style_layers
def extract_features(model, content_layers, style_layers): """Extract features for style transfer"""
outputs = [model.get_layer(name).output for name in style_layers + content_layers] model = keras.Model([model.input], outputs)
return model
def gram_matrix(input_tensor): """Calculate Gram matrix for style representation"""
result = tf.linalg.einsum('bijc,bijd->bcd', input_tensor, input_tensor) input_shape = tf.shape(input_tensor) num_locations = tf.cast(input_shape[1]*input_shape[2], tf.float32)
return result / num_locations
def style_content_loss(outputs, style_targets, content_targets, style_weight=1e-2, content_weight=1e4): """Calculate style and content loss"""
style_outputs = outputs['style'] content_outputs = outputs['content']
# Style loss style_loss = tf.add_n([tf.reduce_mean((style_outputs[name]-style_targets[name])**2) for name in style_outputs.keys()]) style_loss *= style_weight / len(style_outputs)
# Content loss content_loss = tf.add_n([tf.reduce_mean((content_outputs[name]-content_targets[name])**2) for name in content_outputs.keys()]) content_loss *= content_weight / len(content_outputs)
total_loss = style_loss + content_loss return total_loss
@tf.functiondef train_step(image, extractor, style_targets, content_targets, optimizer): """Single training step for style transfer"""
with tf.GradientTape() as tape: outputs = extractor(image) loss = style_content_loss(outputs, style_targets, content_targets)
grad = tape.gradient(loss, image) optimizer.apply_gradients([(grad, image)]) image.assign(tf.clip_by_value(image, clip_value_min=0.0, clip_value_max=1.0))
return loss
def neural_style_transfer(content_path, style_path, epochs=100): """Perform neural style transfer"""
# Load and preprocess images content_image = load_img(content_path) style_image = load_img(style_path)
# Initialize the optimization variable image = tf.Variable(content_image)
# Set up the feature extraction model vgg, content_layers, style_layers = load_style_transfer_models() extractor = extract_features(vgg, content_layers, style_layers)
# Extract target features style_targets = extractor(style_image)['style'] content_targets = extractor(content_image)['content']
# Optimization optimizer = tf.optimizers.Adam(learning_rate=0.02, beta_1=0.99, epsilon=1e-1)
for epoch in range(epochs): loss = train_step(image, extractor, style_targets, content_targets, optimizer)
if epoch % 10 == 0: print(f"Epoch {epoch}, Loss: {loss}")
return image2. Generative Adversarial Networks (GANs)
def create_generator(latent_dim, img_shape): """Create generator network for GAN"""
model = keras.Sequential([ layers.Dense(128 * 7 * 7, input_dim=latent_dim), layers.Reshape((7, 7, 128)), layers.BatchNormalization(), layers.LeakyReLU(alpha=0.01),
layers.Conv2DTranspose(128, 4, strides=2, padding='same'), layers.BatchNormalization(), layers.LeakyReLU(alpha=0.01),
layers.Conv2DTranspose(128, 4, strides=2, padding='same'), layers.BatchNormalization(), layers.LeakyReLU(alpha=0.01),
layers.Conv2D(1, 7, activation='tanh', padding='same') ])
return model
def create_discriminator(img_shape): """Create discriminator network for GAN"""
model = keras.Sequential([ layers.Conv2D(64, 3, strides=2, padding='same', input_shape=img_shape), layers.LeakyReLU(alpha=0.01), layers.Dropout(0.25),
layers.Conv2D(128, 3, strides=2, padding='same'), layers.BatchNormalization(), layers.LeakyReLU(alpha=0.01), layers.Dropout(0.25),
layers.Conv2D(256, 3, strides=2, padding='same'), layers.BatchNormalization(), layers.LeakyReLU(alpha=0.01), layers.Dropout(0.25),
layers.Flatten(), layers.Dense(1, activation='sigmoid') ])
return model
class GAN(keras.Model): """Complete GAN implementation"""
def __init__(self, discriminator, generator, latent_dim): super(GAN, self).__init__() self.discriminator = discriminator self.generator = generator self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn): super(GAN, self).compile() self.d_optimizer = d_optimizer self.g_optimizer = g_optimizer self.loss_fn = loss_fn self.d_loss_metric = keras.metrics.Mean(name="d_loss") self.g_loss_metric = keras.metrics.Mean(name="g_loss")
@property def metrics(self): return [self.d_loss_metric, self.g_loss_metric]
def train_step(self, real_images): batch_size = tf.shape(real_images)[0]
# Generate fake images random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim)) generated_images = self.generator(random_latent_vectors)
# Combine real and fake images combined_images = tf.concat([generated_images, real_images], axis=0) labels = tf.concat([tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0)
# Add noise to labels for better training labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train discriminator with tf.GradientTape() as tape: predictions = self.discriminator(combined_images) d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights) self.d_optimizer.apply_gradients(zip(grads, self.discriminator.trainable_weights))
# Train generator random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim)) misleading_labels = tf.zeros((batch_size, 1))
with tf.GradientTape() as tape: predictions = self.discriminator(self.generator(random_latent_vectors)) g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights) self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
# Update metrics self.d_loss_metric.update_state(d_loss) self.g_loss_metric.update_state(g_loss)
return {"d_loss": self.d_loss_metric.result(), "g_loss": self.g_loss_metric.result()}Model Optimization and Deployment
1. Model Quantization
def quantize_model(model, representative_dataset): """Quantize model for mobile deployment"""
converter = tf.lite.TFLiteConverter.from_keras_model(model)
# Enable optimizations converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Set representative dataset for calibration converter.representative_dataset = representative_dataset
# Enable integer quantization converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8] converter.inference_input_type = tf.int8 converter.inference_output_type = tf.int8
quantized_model = converter.convert()
return quantized_model
def representative_data_gen(): """Generate representative data for quantization""" for _ in range(100): yield [np.random.random((1, 224, 224, 3)).astype(np.float32)]
# Model pruningdef prune_model(model, target_sparsity=0.5): """Prune model to reduce size"""
import tensorflow_model_optimization as tfmot
# Define pruning parameters pruning_params = { 'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay( initial_sparsity=0.0, final_sparsity=target_sparsity, begin_step=0, end_step=1000 ) }
# Apply pruning model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params)
return model_for_pruning
# Knowledge distillationclass Distiller(keras.Model): """Knowledge distillation for model compression"""
def __init__(self, student, teacher): super(Distiller, self).__init__() self.teacher = teacher self.student = student
def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn, alpha=0.1, temperature=3): super(Distiller, self).compile(optimizer=optimizer, metrics=metrics) self.student_loss_fn = student_loss_fn self.distillation_loss_fn = distillation_loss_fn self.alpha = alpha self.temperature = temperature
def train_step(self, data): x, y = data
# Forward pass of teacher teacher_predictions = self.teacher(x, training=False)
with tf.GradientTape() as tape: # Forward pass of student student_predictions = self.student(x, training=True)
# Compute losses student_loss = self.student_loss_fn(y, student_predictions) distillation_loss = self.distillation_loss_fn( tf.nn.softmax(teacher_predictions / self.temperature, axis=1), tf.nn.softmax(student_predictions / self.temperature, axis=1) )
loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss
# Compute gradients trainable_vars = self.student.trainable_variables gradients = tape.gradient(loss, trainable_vars)
# Update weights self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics self.compiled_metrics.update_state(y, student_predictions)
results = {m.name: m.result() for m in self.metrics} results.update({"student_loss": student_loss, "distillation_loss": distillation_loss})
return results2. Model Serving and Deployment
# TensorFlow Serving deploymentdef create_serving_signature(model): """Create serving signature for TensorFlow Serving"""
@tf.function def serve_fn(input_image): # Preprocess input processed_input = tf.cast(input_image, tf.float32) / 255.0
# Run prediction predictions = model(processed_input)
# Post-process output class_ids = tf.argmax(predictions, axis=-1) probabilities = tf.nn.softmax(predictions)
return { 'class_ids': class_ids, 'probabilities': probabilities }
# Define input specification input_spec = tf.TensorSpec(shape=[None, 224, 224, 3], dtype=tf.uint8)
# Create concrete function concrete_function = serve_fn.get_concrete_function(input_spec)
return concrete_function
def export_for_serving(model, export_path): """Export model for TensorFlow Serving"""
# Create serving signature serving_fn = create_serving_signature(model)
# Save model with signature tf.saved_model.save( model, export_path, signatures={'serving_default': serving_fn} )
print(f"Model exported to: {export_path}")
# Edge deployment with TensorFlow Litedef deploy_to_edge(model, model_path): """Deploy model to edge devices"""
# Convert to TensorFlow Lite converter = tf.lite.TFLiteConverter.from_keras_model(model) converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Enable GPU delegate (optional) converter.target_spec.supported_ops = [ tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS ]
tflite_model = converter.convert()
# Save model with open(model_path, 'wb') as f: f.write(tflite_model)
return tflite_model
# TensorFlow.js deploymentdef export_for_web(model, export_path): """Export model for web deployment"""
import tensorflowjs as tfjs
tfjs.converters.save_keras_model(model, export_path) print(f"Model exported for web to: {export_path}")
# Cloud deployment with TensorFlow Extended (TFX)def create_tfx_pipeline(model, data_path, serving_model_dir): """Create TFX pipeline for production deployment"""
from tfx import v1 as tfx
# Define pipeline components example_gen = tfx.components.CsvExampleGen(input_base=data_path)
statistics_gen = tfx.components.StatisticsGen( examples=example_gen.outputs['examples'] )
schema_gen = tfx.components.SchemaGen( statistics=statistics_gen.outputs['statistics'] )
trainer = tfx.components.Trainer( module_file='trainer.py', examples=example_gen.outputs['examples'], schema=schema_gen.outputs['schema'], train_args=tfx.proto.TrainArgs(num_steps=1000), eval_args=tfx.proto.EvalArgs(num_steps=100) )
pusher = tfx.components.Pusher( model=trainer.outputs['model'], push_destination=tfx.proto.PushDestination( filesystem=tfx.proto.PushDestination.Filesystem( base_directory=serving_model_dir ) ) )
# Create and run pipeline pipeline = tfx.dsl.Pipeline( pipeline_name='computer_vision_pipeline', pipeline_root='pipeline_root', components=[example_gen, statistics_gen, schema_gen, trainer, pusher] )
return pipelinePerformance Monitoring and MLOps
1. Model Performance Monitoring
# Data drift detectiondef detect_data_drift(reference_data, current_data, threshold=0.1): """Detect data drift using statistical tests"""
from scipy.stats import ks_2samp
drift_scores = []
for i in range(reference_data.shape[1]): # Kolmogorov-Smirnov test statistic, p_value = ks_2samp( reference_data[:, i], current_data[:, i] )
drift_scores.append({ 'feature': i, 'ks_statistic': statistic, 'p_value': p_value, 'drift_detected': p_value < threshold })
return drift_scores
# Model performance trackingclass ModelMonitor: """Monitor model performance in production"""
def __init__(self, model, reference_data): self.model = model self.reference_data = reference_data self.prediction_history = [] self.performance_history = []
def log_prediction(self, input_data, prediction, ground_truth=None): """Log model prediction"""
log_entry = { 'timestamp': tf.timestamp(), 'input_shape': input_data.shape, 'prediction': prediction, 'confidence': tf.reduce_max(tf.nn.softmax(prediction)) }
if ground_truth is not None: log_entry['ground_truth'] = ground_truth log_entry['correct'] = tf.equal( tf.argmax(prediction), tf.argmax(ground_truth) )
self.prediction_history.append(log_entry)
def calculate_drift(self, current_batch): """Calculate data drift for current batch"""
return detect_data_drift(self.reference_data, current_batch)
def generate_report(self): """Generate performance report"""
if not self.prediction_history: return "No predictions logged"
total_predictions = len(self.prediction_history) correct_predictions = sum(1 for p in self.prediction_history if p.get('correct', False))
accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0 avg_confidence = np.mean([p['confidence'] for p in self.prediction_history])
return { 'total_predictions': total_predictions, 'accuracy': accuracy, 'average_confidence': avg_confidence, 'low_confidence_predictions': sum(1 for p in self.prediction_history if p['confidence'] < 0.7) }
# A/B testing for model comparisonclass ModelABTest: """A/B test framework for model comparison"""
def __init__(self, model_a, model_b, traffic_split=0.5): self.model_a = model_a self.model_b = model_b self.traffic_split = traffic_split self.results_a = [] self.results_b = []
def predict(self, input_data): """Route traffic between models"""
if np.random.random() < self.traffic_split: prediction = self.model_a(input_data) self.results_a.append(prediction) return prediction, 'model_a' else: prediction = self.model_b(input_data) self.results_b.append(prediction) return prediction, 'model_b'
def statistical_significance(self, metric_a, metric_b): """Test statistical significance of results"""
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(metric_a, metric_b)
return { 't_statistic': t_stat, 'p_value': p_value, 'significant': p_value < 0.05, 'winner': 'model_a' if np.mean(metric_a) > np.mean(metric_b) else 'model_b' }Conclusion
Advanced computer vision with TensorFlow opens up endless possibilities for solving real-world problems. From transfer learning for quick prototyping to sophisticated object detection and segmentation systems, the techniques covered in this guide provide a solid foundation for building production-ready computer vision applications.
Key Takeaways:
- Transfer Learning is often the best starting point for most CV tasks
- Modern Architectures like EfficientNet provide excellent performance/efficiency trade-offs
- Object Detection and Segmentation enable more complex visual understanding
- Model Optimization is crucial for deployment to resource-constrained environments
- MLOps Practices ensure reliable operation in production
Next Steps:
- Experiment with different architectures on your specific datasets
- Explore domain-specific applications (medical imaging, satellite imagery, etc.)
- Implement real-time processing pipelines
- Study the latest research in computer vision
- Build end-to-end applications with proper monitoring
The computer vision field is rapidly evolving, with new architectures and techniques emerging regularly. The foundation you’ve built with TensorFlow will serve you well as you continue to explore this exciting domain.
Ready to apply these techniques? Check out our TensorFlow getting started guide for the fundamentals, then start building your own computer vision applications!