3289 words
16 minutes
Advanced Computer Vision with TensorFlow - Building Real-World Image Recognition Systems

Advanced Computer Vision with TensorFlow - Building Real-World Image Recognition Systems#

Computer vision has revolutionized industries from healthcare to autonomous vehicles. With TensorFlow, building sophisticated image recognition systems has become more accessible than ever. This comprehensive guide will take you beyond basic image classification to advanced computer vision techniques used in production systems.

What We’ll Build#

In this tutorial, you’ll learn to create:

  • Custom Image Classifiers using transfer learning
  • Object Detection Systems for real-time applications
  • Image Segmentation Models for pixel-level analysis
  • Style Transfer Networks for artistic applications
  • Production-Ready CV Pipelines with optimization and deployment

Prerequisites#

Before diving in, ensure you have:

  • TensorFlow 2.x installed (see our getting started guide)
  • Basic CNN knowledge from the previous tutorial
  • 8GB+ RAM (16GB recommended for large models)
  • GPU support (highly recommended for training)

Advanced CNN Architectures#

1. Understanding Modern CNN Architectures#

Let’s explore the evolution of CNN architectures:

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import numpy as np
# ResNet Block - Solving the vanishing gradient problem
def residual_block(x, filters, kernel_size=3, stride=1, conv_shortcut=True):
"""A residual block with skip connections"""
if conv_shortcut:
shortcut = layers.Conv2D(4 * filters, 1, strides=stride)(x)
shortcut = layers.BatchNormalization()(shortcut)
else:
shortcut = x
x = layers.Conv2D(filters, 1, strides=stride)(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.Conv2D(filters, kernel_size, padding='SAME')(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.Conv2D(4 * filters, 1)(x)
x = layers.BatchNormalization()(x)
x = layers.Add()([shortcut, x])
x = layers.Activation('relu')(x)
return x
# Attention Mechanism for CNN
def attention_block(x, filters):
"""Squeeze-and-Excitation attention block"""
# Global average pooling
gap = layers.GlobalAveragePooling2D()(x)
# Fully connected layers
fc1 = layers.Dense(filters // 16, activation='relu')(gap)
fc2 = layers.Dense(filters, activation='sigmoid')(fc1)
# Reshape and multiply
attention = layers.Reshape((1, 1, filters))(fc2)
x = layers.Multiply()([x, attention])
return x
# Modern CNN with attention
def create_advanced_cnn(input_shape, num_classes):
inputs = keras.Input(shape=input_shape)
# Initial convolution
x = layers.Conv2D(64, 7, strides=2, padding='SAME')(inputs)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.MaxPooling2D(3, strides=2, padding='SAME')(x)
# Residual blocks with increasing filters
for filters in [64, 128, 256, 512]:
x = residual_block(x, filters)
x = attention_block(x, filters * 4)
# Classification head
x = layers.GlobalAveragePooling2D()(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(num_classes, activation='softmax')(x)
return keras.Model(inputs, outputs)
# Example usage
model = create_advanced_cnn((224, 224, 3), 1000)
print(f"Model parameters: {model.count_params():,}")

2. EfficientNet - Optimized Architecture#

# Using pre-trained EfficientNet
def create_efficientnet_model(num_classes, input_shape=(224, 224, 3)):
base_model = keras.applications.EfficientNetB0(
weights='imagenet',
include_top=False,
input_shape=input_shape
)
# Freeze base model initially
base_model.trainable = False
model = keras.Sequential([
base_model,
layers.GlobalAveragePooling2D(),
layers.BatchNormalization(),
layers.Dropout(0.2),
layers.Dense(num_classes, activation='softmax')
])
return model, base_model
# Fine-tuning strategy
def setup_fine_tuning(model, base_model, learning_rate=1e-5):
# Unfreeze the top layers of the base model
base_model.trainable = True
# Fine-tune from this layer onwards
fine_tune_at = len(base_model.layers) - 20
# Freeze all the layers before the `fine_tune_at` layer
for layer in base_model.layers[:fine_tune_at]:
layer.trainable = False
# Use a lower learning rate for fine-tuning
model.compile(
optimizer=keras.optimizers.Adam(learning_rate/10),
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model

Transfer Learning Mastery#

1. Custom Dataset Preparation#

import os
from pathlib import Path
import PIL.Image
def create_dataset_from_directory(data_dir, image_size=(224, 224), batch_size=32):
"""Create a tf.data dataset from directory structure"""
# Create dataset from directory
dataset = keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=image_size,
batch_size=batch_size
)
val_dataset = keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=image_size,
batch_size=batch_size
)
return dataset, val_dataset
# Advanced data augmentation pipeline
def create_augmentation_pipeline():
"""Create sophisticated data augmentation"""
data_augmentation = keras.Sequential([
layers.RandomFlip("horizontal"),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
layers.RandomContrast(0.1),
layers.RandomBrightness(0.1),
# Custom augmentation
layers.Lambda(lambda x: tf.image.random_hue(x, 0.02)),
layers.Lambda(lambda x: tf.image.random_saturation(x, 0.7, 1.3)),
])
return data_augmentation
# Preprocessing pipeline
def preprocess_dataset(dataset, augment=True):
"""Optimize dataset for training"""
# Normalization (for pre-trained models)
normalization_layer = keras.utils.get_file(
'imagenet_mean.txt',
'https://github.com/tensorflow/models/raw/master/research/deeplab/datasets/remove_gt_colormap.py'
)
AUTOTUNE = tf.data.AUTOTUNE
if augment:
augmentation = create_augmentation_pipeline()
dataset = dataset.map(
lambda x, y: (augmentation(x, training=True), y),
num_parallel_calls=AUTOTUNE
)
# Normalize pixel values to [0,1]
dataset = dataset.map(
lambda x, y: (tf.cast(x, tf.float32) / 255.0, y),
num_parallel_calls=AUTOTUNE
)
# Optimize performance
dataset = dataset.cache()
dataset = dataset.shuffle(1000)
dataset = dataset.prefetch(AUTOTUNE)
return dataset
# Complete transfer learning pipeline
def train_custom_classifier(data_dir, num_classes, epochs=20):
# Load and preprocess data
train_ds, val_ds = create_dataset_from_directory(data_dir)
train_ds = preprocess_dataset(train_ds, augment=True)
val_ds = preprocess_dataset(val_ds, augment=False)
# Create model
model, base_model = create_efficientnet_model(num_classes)
# Initial training (frozen base)
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Callbacks
callbacks = [
keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
keras.callbacks.ReduceLROnPlateau(factor=0.2, patience=3),
keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True)
]
# Train frozen model
print("Training with frozen base model...")
history1 = model.fit(
train_ds,
epochs=epochs//2,
validation_data=val_ds,
callbacks=callbacks
)
# Fine-tuning
print("Fine-tuning model...")
model = setup_fine_tuning(model, base_model)
history2 = model.fit(
train_ds,
epochs=epochs//2,
validation_data=val_ds,
callbacks=callbacks
)
return model, history1, history2

2. Advanced Transfer Learning Techniques#

# Multi-scale feature extraction
def create_multiscale_model(base_model, num_classes):
"""Extract features from multiple layers"""
# Get intermediate layer outputs
layer_names = [
'block4a_expand_activation', # 28x28
'block6a_expand_activation', # 14x14
'top_activation' # 7x7
]
layers_outputs = [base_model.get_layer(name).output for name in layer_names]
# Create feature extraction model
feature_extractor = keras.Model(
inputs=base_model.input,
outputs=layers_outputs
)
# Multi-scale processing
inputs = keras.Input(shape=(224, 224, 3))
features = feature_extractor(inputs)
# Process each scale
processed_features = []
for i, feature in enumerate(features):
# Global average pooling for each scale
gap = layers.GlobalAveragePooling2D()(feature)
dense = layers.Dense(256, activation='relu')(gap)
processed_features.append(dense)
# Concatenate multi-scale features
combined = layers.Concatenate()(processed_features)
combined = layers.Dropout(0.5)(combined)
outputs = layers.Dense(num_classes, activation='softmax')(combined)
return keras.Model(inputs, outputs)
# Domain adaptation techniques
def create_domain_adaptive_model(source_model, target_classes):
"""Adapt model from source domain to target domain"""
# Remove the last classification layer
base_features = source_model.layers[-2].output
# Add domain classifier (for adversarial training)
domain_classifier = layers.Dense(1, activation='sigmoid', name='domain')(base_features)
# Add new task classifier
task_classifier = layers.Dense(target_classes, activation='softmax', name='task')(base_features)
# Create multi-output model
adapted_model = keras.Model(
inputs=source_model.input,
outputs=[task_classifier, domain_classifier]
)
return adapted_model

Object Detection with TensorFlow#

1. YOLO-style Object Detection#

# Custom YOLO implementation
def create_yolo_model(input_shape, num_classes, num_anchors=3):
"""Simplified YOLO architecture"""
inputs = keras.Input(shape=input_shape)
# Backbone (feature extractor)
x = layers.Conv2D(32, 3, padding='same', activation='relu')(inputs)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(128, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(256, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D()(x)
x = layers.Conv2D(512, 3, padding='same', activation='relu')(x)
# Detection head
# Output: (batch, grid_h, grid_w, anchors * (5 + num_classes))
# 5 = x, y, w, h, confidence
outputs = layers.Conv2D(
num_anchors * (5 + num_classes),
1,
activation='linear'
)(x)
return keras.Model(inputs, outputs)
# YOLO loss function
def yolo_loss(y_true, y_pred, num_classes=80, num_anchors=3):
"""YOLO loss function"""
# Reshape predictions
grid_h, grid_w = tf.shape(y_pred)[1], tf.shape(y_pred)[2]
y_pred = tf.reshape(y_pred, (-1, grid_h, grid_w, num_anchors, 5 + num_classes))
# Split predictions
pred_xy = tf.sigmoid(y_pred[..., :2]) # Center coordinates
pred_wh = y_pred[..., 2:4] # Width and height
pred_conf = tf.sigmoid(y_pred[..., 4]) # Confidence
pred_class = y_pred[..., 5:] # Class probabilities
# Split ground truth
true_xy = y_true[..., :2]
true_wh = y_true[..., 2:4]
true_conf = y_true[..., 4]
true_class = y_true[..., 5:]
# Calculate losses
xy_loss = tf.reduce_sum(tf.square(true_xy - pred_xy)) * true_conf
wh_loss = tf.reduce_sum(tf.square(tf.sqrt(true_wh) - tf.sqrt(pred_wh))) * true_conf
conf_loss = tf.reduce_sum(tf.square(true_conf - pred_conf))
class_loss = tf.reduce_sum(tf.square(true_class - pred_class)) * true_conf
total_loss = xy_loss + wh_loss + conf_loss + class_loss
return total_loss
# Non-Maximum Suppression
def non_max_suppression(boxes, scores, max_outputs=50, iou_threshold=0.5):
"""Apply NMS to filter overlapping boxes"""
selected_indices = tf.image.non_max_suppression(
boxes, scores, max_outputs, iou_threshold
)
selected_boxes = tf.gather(boxes, selected_indices)
selected_scores = tf.gather(scores, selected_indices)
return selected_boxes, selected_scores, selected_indices

2. Using TensorFlow Object Detection API#

# Install TensorFlow Object Detection API
# !pip install tensorflow-object-detection-api
import tensorflow_hub as hub
def load_detector(model_url):
"""Load pre-trained object detection model"""
detector = hub.load(model_url)
return detector
def detect_objects(detector, image_path, min_score=0.3):
"""Detect objects in an image"""
# Load and preprocess image
image = tf.io.read_file(image_path)
image = tf.image.decode_image(image, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
image = image[tf.newaxis, ...]
# Run detection
results = detector(image)
# Filter by confidence score
scores = results['detection_scores'][0].numpy()
boxes = results['detection_boxes'][0].numpy()
classes = results['detection_class_entities'][0].numpy()
# Filter detections
valid_detections = scores >= min_score
return {
'boxes': boxes[valid_detections],
'scores': scores[valid_detections],
'classes': classes[valid_detections]
}
# Real-time object detection
def real_time_detection(detector, camera_index=0):
"""Real-time object detection from webcam"""
import cv2
cap = cv2.VideoCapture(camera_index)
while True:
ret, frame = cap.read()
if not ret:
break
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Prepare for detection
input_tensor = tf.convert_to_tensor(rgb_frame)
input_tensor = input_tensor[tf.newaxis, ...]
input_tensor = tf.cast(input_tensor, tf.float32) / 255.0
# Detect objects
detections = detector(input_tensor)
# Draw bounding boxes
frame = draw_detections(frame, detections)
cv2.imshow('Object Detection', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def draw_detections(image, detections, min_score=0.3):
"""Draw bounding boxes and labels on image"""
import cv2
h, w, _ = image.shape
scores = detections['detection_scores'][0].numpy()
boxes = detections['detection_boxes'][0].numpy()
classes = detections['detection_class_entities'][0].numpy()
for i in range(len(scores)):
if scores[i] >= min_score:
# Convert normalized coordinates to pixel coordinates
y1, x1, y2, x2 = boxes[i]
x1, y1, x2, y2 = int(x1*w), int(y1*h), int(x2*w), int(y2*h)
# Draw bounding box
cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw label
label = f"{classes[i].decode('utf-8')}: {scores[i]:.2f}"
cv2.putText(image, label, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
return image

Image Segmentation#

1. U-Net for Semantic Segmentation#

def conv_block(x, filters, kernel_size=3):
"""Convolutional block for U-Net"""
x = layers.Conv2D(filters, kernel_size, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
x = layers.Conv2D(filters, kernel_size, padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.Activation('relu')(x)
return x
def create_unet(input_shape, num_classes):
"""U-Net architecture for segmentation"""
inputs = keras.Input(shape=input_shape)
# Encoder (downsampling)
conv1 = conv_block(inputs, 64)
pool1 = layers.MaxPooling2D()(conv1)
conv2 = conv_block(pool1, 128)
pool2 = layers.MaxPooling2D()(conv2)
conv3 = conv_block(pool2, 256)
pool3 = layers.MaxPooling2D()(conv3)
conv4 = conv_block(pool3, 512)
pool4 = layers.MaxPooling2D()(conv4)
# Bottleneck
conv5 = conv_block(pool4, 1024)
# Decoder (upsampling)
up6 = layers.Conv2DTranspose(512, 2, strides=2, padding='same')(conv5)
up6 = layers.Concatenate()([up6, conv4])
conv6 = conv_block(up6, 512)
up7 = layers.Conv2DTranspose(256, 2, strides=2, padding='same')(conv6)
up7 = layers.Concatenate()([up7, conv3])
conv7 = conv_block(up7, 256)
up8 = layers.Conv2DTranspose(128, 2, strides=2, padding='same')(conv7)
up8 = layers.Concatenate()([up8, conv2])
conv8 = conv_block(up8, 128)
up9 = layers.Conv2DTranspose(64, 2, strides=2, padding='same')(conv8)
up9 = layers.Concatenate()([up9, conv1])
conv9 = conv_block(up9, 64)
# Output layer
outputs = layers.Conv2D(num_classes, 1, activation='softmax')(conv9)
return keras.Model(inputs, outputs)
# Dice loss for segmentation
def dice_loss(y_true, y_pred, smooth=1e-6):
"""Dice loss function for segmentation"""
y_true_f = tf.keras.backend.flatten(y_true)
y_pred_f = tf.keras.backend.flatten(y_pred)
intersection = tf.keras.backend.sum(y_true_f * y_pred_f)
dice = (2. * intersection + smooth) / (
tf.keras.backend.sum(y_true_f) + tf.keras.backend.sum(y_pred_f) + smooth
)
return 1 - dice
# IoU metric for segmentation
def iou_metric(y_true, y_pred, num_classes):
"""Intersection over Union metric"""
ious = []
for cls in range(num_classes):
y_true_cls = tf.equal(y_true, cls)
y_pred_cls = tf.equal(tf.argmax(y_pred, axis=-1), cls)
intersection = tf.reduce_sum(tf.cast(y_true_cls & y_pred_cls, tf.float32))
union = tf.reduce_sum(tf.cast(y_true_cls | y_pred_cls, tf.float32))
iou = intersection / (union + 1e-10)
ious.append(iou)
return tf.reduce_mean(ious)

2. Instance Segmentation with Mask R-CNN#

# Using TensorFlow Hub for Mask R-CNN
def load_mask_rcnn():
"""Load pre-trained Mask R-CNN model"""
model_url = "https://tfhub.dev/tensorflow/mask_rcnn/inception_resnet_v2_1024x1024/1"
model = hub.load(model_url)
return model
def instance_segmentation(model, image_path):
"""Perform instance segmentation"""
# Load and preprocess image
image = tf.io.read_file(image_path)
image = tf.image.decode_image(image, channels=3)
image = tf.image.convert_image_dtype(image, tf.float32)
image = tf.expand_dims(image, 0)
# Run inference
results = model(image)
return {
'detection_boxes': results['detection_boxes'][0].numpy(),
'detection_classes': results['detection_classes'][0].numpy().astype(int),
'detection_scores': results['detection_scores'][0].numpy(),
'detection_masks': results['detection_masks'][0].numpy()
}
def visualize_instance_segmentation(image, results, min_score=0.3):
"""Visualize instance segmentation results"""
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import matplotlib.patches as patches
fig, ax = plt.subplots(1, figsize=(12, 8))
ax.imshow(image)
boxes = results['detection_boxes']
classes = results['detection_classes']
scores = results['detection_scores']
masks = results['detection_masks']
colors = plt.cm.Set3(np.linspace(0, 1, len(boxes)))
for i, (box, cls, score, mask) in enumerate(zip(boxes, classes, scores, masks)):
if score >= min_score:
# Draw bounding box
y1, x1, y2, x2 = box
h, w = image.shape[:2]
x1, y1, x2, y2 = x1*w, y1*h, x2*w, y2*h
rect = Rectangle((x1, y1), x2-x1, y2-y1,
linewidth=2, edgecolor=colors[i], facecolor='none')
ax.add_patch(rect)
# Draw mask
mask_resized = tf.image.resize(mask[..., None], [h, w])
mask_resized = tf.squeeze(mask_resized) > 0.5
colored_mask = np.zeros((h, w, 4))
colored_mask[..., :3] = colors[i][:3]
colored_mask[..., 3] = mask_resized * 0.5
ax.imshow(colored_mask)
# Add label
ax.text(x1, y1-10, f'Class {cls}: {score:.2f}',
bbox=dict(facecolor=colors[i], alpha=0.8))
ax.axis('off')
plt.title('Instance Segmentation Results')
plt.show()

Style Transfer and GANs#

1. Neural Style Transfer#

def load_style_transfer_models():
"""Load pre-trained style transfer models"""
# VGG19 for feature extraction
vgg = keras.applications.VGG19(include_top=False, weights='imagenet')
vgg.trainable = False
# Layers for content and style representation
content_layers = ['block5_conv2']
style_layers = ['block1_conv1', 'block2_conv1', 'block3_conv1',
'block4_conv1', 'block5_conv1']
return vgg, content_layers, style_layers
def extract_features(model, content_layers, style_layers):
"""Extract features for style transfer"""
outputs = [model.get_layer(name).output for name in style_layers + content_layers]
model = keras.Model([model.input], outputs)
return model
def gram_matrix(input_tensor):
"""Calculate Gram matrix for style representation"""
result = tf.linalg.einsum('bijc,bijd->bcd', input_tensor, input_tensor)
input_shape = tf.shape(input_tensor)
num_locations = tf.cast(input_shape[1]*input_shape[2], tf.float32)
return result / num_locations
def style_content_loss(outputs, style_targets, content_targets,
style_weight=1e-2, content_weight=1e4):
"""Calculate style and content loss"""
style_outputs = outputs['style']
content_outputs = outputs['content']
# Style loss
style_loss = tf.add_n([tf.reduce_mean((style_outputs[name]-style_targets[name])**2)
for name in style_outputs.keys()])
style_loss *= style_weight / len(style_outputs)
# Content loss
content_loss = tf.add_n([tf.reduce_mean((content_outputs[name]-content_targets[name])**2)
for name in content_outputs.keys()])
content_loss *= content_weight / len(content_outputs)
total_loss = style_loss + content_loss
return total_loss
@tf.function
def train_step(image, extractor, style_targets, content_targets, optimizer):
"""Single training step for style transfer"""
with tf.GradientTape() as tape:
outputs = extractor(image)
loss = style_content_loss(outputs, style_targets, content_targets)
grad = tape.gradient(loss, image)
optimizer.apply_gradients([(grad, image)])
image.assign(tf.clip_by_value(image, clip_value_min=0.0, clip_value_max=1.0))
return loss
def neural_style_transfer(content_path, style_path, epochs=100):
"""Perform neural style transfer"""
# Load and preprocess images
content_image = load_img(content_path)
style_image = load_img(style_path)
# Initialize the optimization variable
image = tf.Variable(content_image)
# Set up the feature extraction model
vgg, content_layers, style_layers = load_style_transfer_models()
extractor = extract_features(vgg, content_layers, style_layers)
# Extract target features
style_targets = extractor(style_image)['style']
content_targets = extractor(content_image)['content']
# Optimization
optimizer = tf.optimizers.Adam(learning_rate=0.02, beta_1=0.99, epsilon=1e-1)
for epoch in range(epochs):
loss = train_step(image, extractor, style_targets, content_targets, optimizer)
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss}")
return image

2. Generative Adversarial Networks (GANs)#

def create_generator(latent_dim, img_shape):
"""Create generator network for GAN"""
model = keras.Sequential([
layers.Dense(128 * 7 * 7, input_dim=latent_dim),
layers.Reshape((7, 7, 128)),
layers.BatchNormalization(),
layers.LeakyReLU(alpha=0.01),
layers.Conv2DTranspose(128, 4, strides=2, padding='same'),
layers.BatchNormalization(),
layers.LeakyReLU(alpha=0.01),
layers.Conv2DTranspose(128, 4, strides=2, padding='same'),
layers.BatchNormalization(),
layers.LeakyReLU(alpha=0.01),
layers.Conv2D(1, 7, activation='tanh', padding='same')
])
return model
def create_discriminator(img_shape):
"""Create discriminator network for GAN"""
model = keras.Sequential([
layers.Conv2D(64, 3, strides=2, padding='same', input_shape=img_shape),
layers.LeakyReLU(alpha=0.01),
layers.Dropout(0.25),
layers.Conv2D(128, 3, strides=2, padding='same'),
layers.BatchNormalization(),
layers.LeakyReLU(alpha=0.01),
layers.Dropout(0.25),
layers.Conv2D(256, 3, strides=2, padding='same'),
layers.BatchNormalization(),
layers.LeakyReLU(alpha=0.01),
layers.Dropout(0.25),
layers.Flatten(),
layers.Dense(1, activation='sigmoid')
])
return model
class GAN(keras.Model):
"""Complete GAN implementation"""
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
self.d_loss_metric = keras.metrics.Mean(name="d_loss")
self.g_loss_metric = keras.metrics.Mean(name="g_loss")
@property
def metrics(self):
return [self.d_loss_metric, self.g_loss_metric]
def train_step(self, real_images):
batch_size = tf.shape(real_images)[0]
# Generate fake images
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
generated_images = self.generator(random_latent_vectors)
# Combine real and fake images
combined_images = tf.concat([generated_images, real_images], axis=0)
labels = tf.concat([tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0)
# Add noise to labels for better training
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(zip(grads, self.discriminator.trainable_weights))
# Train generator
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
misleading_labels = tf.zeros((batch_size, 1))
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
# Update metrics
self.d_loss_metric.update_state(d_loss)
self.g_loss_metric.update_state(g_loss)
return {"d_loss": self.d_loss_metric.result(), "g_loss": self.g_loss_metric.result()}

Model Optimization and Deployment#

1. Model Quantization#

def quantize_model(model, representative_dataset):
"""Quantize model for mobile deployment"""
converter = tf.lite.TFLiteConverter.from_keras_model(model)
# Enable optimizations
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Set representative dataset for calibration
converter.representative_dataset = representative_dataset
# Enable integer quantization
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.int8
converter.inference_output_type = tf.int8
quantized_model = converter.convert()
return quantized_model
def representative_data_gen():
"""Generate representative data for quantization"""
for _ in range(100):
yield [np.random.random((1, 224, 224, 3)).astype(np.float32)]
# Model pruning
def prune_model(model, target_sparsity=0.5):
"""Prune model to reduce size"""
import tensorflow_model_optimization as tfmot
# Define pruning parameters
pruning_params = {
'pruning_schedule': tfmot.sparsity.keras.PolynomialDecay(
initial_sparsity=0.0,
final_sparsity=target_sparsity,
begin_step=0,
end_step=1000
)
}
# Apply pruning
model_for_pruning = tfmot.sparsity.keras.prune_low_magnitude(model, **pruning_params)
return model_for_pruning
# Knowledge distillation
class Distiller(keras.Model):
"""Knowledge distillation for model compression"""
def __init__(self, student, teacher):
super(Distiller, self).__init__()
self.teacher = teacher
self.student = student
def compile(self, optimizer, metrics, student_loss_fn, distillation_loss_fn,
alpha=0.1, temperature=3):
super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
self.student_loss_fn = student_loss_fn
self.distillation_loss_fn = distillation_loss_fn
self.alpha = alpha
self.temperature = temperature
def train_step(self, data):
x, y = data
# Forward pass of teacher
teacher_predictions = self.teacher(x, training=False)
with tf.GradientTape() as tape:
# Forward pass of student
student_predictions = self.student(x, training=True)
# Compute losses
student_loss = self.student_loss_fn(y, student_predictions)
distillation_loss = self.distillation_loss_fn(
tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
tf.nn.softmax(student_predictions / self.temperature, axis=1)
)
loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss
# Compute gradients
trainable_vars = self.student.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics
self.compiled_metrics.update_state(y, student_predictions)
results = {m.name: m.result() for m in self.metrics}
results.update({"student_loss": student_loss, "distillation_loss": distillation_loss})
return results

2. Model Serving and Deployment#

# TensorFlow Serving deployment
def create_serving_signature(model):
"""Create serving signature for TensorFlow Serving"""
@tf.function
def serve_fn(input_image):
# Preprocess input
processed_input = tf.cast(input_image, tf.float32) / 255.0
# Run prediction
predictions = model(processed_input)
# Post-process output
class_ids = tf.argmax(predictions, axis=-1)
probabilities = tf.nn.softmax(predictions)
return {
'class_ids': class_ids,
'probabilities': probabilities
}
# Define input specification
input_spec = tf.TensorSpec(shape=[None, 224, 224, 3], dtype=tf.uint8)
# Create concrete function
concrete_function = serve_fn.get_concrete_function(input_spec)
return concrete_function
def export_for_serving(model, export_path):
"""Export model for TensorFlow Serving"""
# Create serving signature
serving_fn = create_serving_signature(model)
# Save model with signature
tf.saved_model.save(
model,
export_path,
signatures={'serving_default': serving_fn}
)
print(f"Model exported to: {export_path}")
# Edge deployment with TensorFlow Lite
def deploy_to_edge(model, model_path):
"""Deploy model to edge devices"""
# Convert to TensorFlow Lite
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
# Enable GPU delegate (optional)
converter.target_spec.supported_ops = [
tf.lite.OpsSet.TFLITE_BUILTINS,
tf.lite.OpsSet.SELECT_TF_OPS
]
tflite_model = converter.convert()
# Save model
with open(model_path, 'wb') as f:
f.write(tflite_model)
return tflite_model
# TensorFlow.js deployment
def export_for_web(model, export_path):
"""Export model for web deployment"""
import tensorflowjs as tfjs
tfjs.converters.save_keras_model(model, export_path)
print(f"Model exported for web to: {export_path}")
# Cloud deployment with TensorFlow Extended (TFX)
def create_tfx_pipeline(model, data_path, serving_model_dir):
"""Create TFX pipeline for production deployment"""
from tfx import v1 as tfx
# Define pipeline components
example_gen = tfx.components.CsvExampleGen(input_base=data_path)
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples']
)
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics']
)
trainer = tfx.components.Trainer(
module_file='trainer.py',
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=1000),
eval_args=tfx.proto.EvalArgs(num_steps=100)
)
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir
)
)
)
# Create and run pipeline
pipeline = tfx.dsl.Pipeline(
pipeline_name='computer_vision_pipeline',
pipeline_root='pipeline_root',
components=[example_gen, statistics_gen, schema_gen, trainer, pusher]
)
return pipeline

Performance Monitoring and MLOps#

1. Model Performance Monitoring#

# Data drift detection
def detect_data_drift(reference_data, current_data, threshold=0.1):
"""Detect data drift using statistical tests"""
from scipy.stats import ks_2samp
drift_scores = []
for i in range(reference_data.shape[1]):
# Kolmogorov-Smirnov test
statistic, p_value = ks_2samp(
reference_data[:, i],
current_data[:, i]
)
drift_scores.append({
'feature': i,
'ks_statistic': statistic,
'p_value': p_value,
'drift_detected': p_value < threshold
})
return drift_scores
# Model performance tracking
class ModelMonitor:
"""Monitor model performance in production"""
def __init__(self, model, reference_data):
self.model = model
self.reference_data = reference_data
self.prediction_history = []
self.performance_history = []
def log_prediction(self, input_data, prediction, ground_truth=None):
"""Log model prediction"""
log_entry = {
'timestamp': tf.timestamp(),
'input_shape': input_data.shape,
'prediction': prediction,
'confidence': tf.reduce_max(tf.nn.softmax(prediction))
}
if ground_truth is not None:
log_entry['ground_truth'] = ground_truth
log_entry['correct'] = tf.equal(
tf.argmax(prediction),
tf.argmax(ground_truth)
)
self.prediction_history.append(log_entry)
def calculate_drift(self, current_batch):
"""Calculate data drift for current batch"""
return detect_data_drift(self.reference_data, current_batch)
def generate_report(self):
"""Generate performance report"""
if not self.prediction_history:
return "No predictions logged"
total_predictions = len(self.prediction_history)
correct_predictions = sum(1 for p in self.prediction_history
if p.get('correct', False))
accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
avg_confidence = np.mean([p['confidence'] for p in self.prediction_history])
return {
'total_predictions': total_predictions,
'accuracy': accuracy,
'average_confidence': avg_confidence,
'low_confidence_predictions': sum(1 for p in self.prediction_history
if p['confidence'] < 0.7)
}
# A/B testing for model comparison
class ModelABTest:
"""A/B test framework for model comparison"""
def __init__(self, model_a, model_b, traffic_split=0.5):
self.model_a = model_a
self.model_b = model_b
self.traffic_split = traffic_split
self.results_a = []
self.results_b = []
def predict(self, input_data):
"""Route traffic between models"""
if np.random.random() < self.traffic_split:
prediction = self.model_a(input_data)
self.results_a.append(prediction)
return prediction, 'model_a'
else:
prediction = self.model_b(input_data)
self.results_b.append(prediction)
return prediction, 'model_b'
def statistical_significance(self, metric_a, metric_b):
"""Test statistical significance of results"""
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(metric_a, metric_b)
return {
't_statistic': t_stat,
'p_value': p_value,
'significant': p_value < 0.05,
'winner': 'model_a' if np.mean(metric_a) > np.mean(metric_b) else 'model_b'
}

Conclusion#

Advanced computer vision with TensorFlow opens up endless possibilities for solving real-world problems. From transfer learning for quick prototyping to sophisticated object detection and segmentation systems, the techniques covered in this guide provide a solid foundation for building production-ready computer vision applications.

Key Takeaways:#

  1. Transfer Learning is often the best starting point for most CV tasks
  2. Modern Architectures like EfficientNet provide excellent performance/efficiency trade-offs
  3. Object Detection and Segmentation enable more complex visual understanding
  4. Model Optimization is crucial for deployment to resource-constrained environments
  5. MLOps Practices ensure reliable operation in production

Next Steps:#

  • Experiment with different architectures on your specific datasets
  • Explore domain-specific applications (medical imaging, satellite imagery, etc.)
  • Implement real-time processing pipelines
  • Study the latest research in computer vision
  • Build end-to-end applications with proper monitoring

The computer vision field is rapidly evolving, with new architectures and techniques emerging regularly. The foundation you’ve built with TensorFlow will serve you well as you continue to explore this exciting domain.


Ready to apply these techniques? Check out our TensorFlow getting started guide for the fundamentals, then start building your own computer vision applications!

Advanced Computer Vision with TensorFlow - Building Real-World Image Recognition Systems
https://antonio-roth.icanse.eu.org/posts/advanced-computer-vision-tensorflow/
Author
Antonio Roth
Published at
2025-08-28
License
CC BY-NC-SA 4.0