TensorFlow 2.x: Complete Step-by-Step Guide
Table of Contents
- Introduction & Setup
- Tensors Basics
- Keras API Fundamentals
- Building Neural Networks
- Training & Evaluation
- CNN Example
- RNN/LSTM Example
- Custom Training Loops
- TensorFlow Advanced Features
- Deployment & Production
- Interview Questions
1. Introduction & Setup {#introduction}
What is TensorFlow?
TensorFlow is Google's open-source machine learning framework that supports:
- Eager execution (immediate operation evaluation)
- Graph execution (optimized computation graphs)
- Distributed training
- Production deployment across platforms
Installation
# CPU only
pip install tensorflow
# GPU support
pip install tensorflow-gpu
# Verify installation
python -c "import tensorflow as tf; print(tf.__version__)"
Basic Imports
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models
import numpy as np
import matplotlib.pyplot as plt
# Check GPU availability
print("GPU Available:", tf.config.list_physical_devices('GPU'))
print("TensorFlow Version:", tf.__version__)
2. Tensors Basics {#tensors}
Creating Tensors
# From Python lists
tensor_1d = tf.constant([1, 2, 3, 4, 5])
tensor_2d = tf.constant([[1, 2], [3, 4], [5, 6]])
# From NumPy arrays
numpy_array = np.array([[1, 2, 3], [4, 5, 6]])
tensor_from_numpy = tf.constant(numpy_array)
# Special tensors
zeros = tf.zeros([3, 4])
ones = tf.ones([2, 3])
eye = tf.eye(3) # Identity matrix
# Random tensors
normal = tf.random.normal([3, 4], mean=0, stddev=1)
uniform = tf.random.uniform([2, 3], minval=0, maxval=1)
# With specific dtype
float_tensor = tf.constant([1, 2, 3], dtype=tf.float32)
int_tensor = tf.constant([1, 2, 3], dtype=tf.int32)
Tensor Operations
# Basic arithmetic
a = tf.constant([[1, 2], [3, 4]], dtype=tf.float32)
b = tf.constant([[5, 6], [7, 8]], dtype=tf.float32)
add = tf.add(a, b) # or a + b
subtract = tf.subtract(a, b) # or a - b
multiply = tf.multiply(a, b) # Element-wise, or a * b
matmul = tf.matmul(a, b) # Matrix multiplication, or a @ b
# Reshaping
x = tf.constant([[1, 2, 3], [4, 5, 6]])
reshaped = tf.reshape(x, [3, 2])
flattened = tf.reshape(x, [-1]) # Flatten to 1D
# Aggregation operations
tensor = tf.constant([[1, 2, 3], [4, 5, 6]], dtype=tf.float32)
sum_all = tf.reduce_sum(tensor)
sum_axis0 = tf.reduce_sum(tensor, axis=0)
mean = tf.reduce_mean(tensor)
max_val = tf.reduce_max(tensor)
Indexing and Slicing
tensor = tf.constant([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
# Basic indexing
first_row = tensor[0]
element = tensor[1, 2] # Element at row 1, col 2
# Slicing
first_two_rows = tensor[:2]
last_two_cols = tensor[:, -2:]
sub_matrix = tensor[1:, :2]
# Gathering specific indices
indices = tf.constant([0, 2])
gathered = tf.gather(tensor, indices) # Rows 0 and 2
Variable Tensors
# Variables are mutable tensors
var = tf.Variable([[1, 2], [3, 4]], dtype=tf.float32)
# Modify variables
var.assign([[5, 6], [7, 8]])
var.assign_add([[1, 1], [1, 1]])
var.assign_sub([[1, 1], [1, 1]])
# Variables are typically used for model parameters
weights = tf.Variable(tf.random.normal([784, 10]))
bias = tf.Variable(tf.zeros([10]))
3. Keras API Fundamentals {#keras-api}
Sequential Model
# Simple sequential model
model = keras.Sequential([
layers.Dense(128, activation='relu', input_shape=(784,)),
layers.Dropout(0.2),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax')
])
# Alternative way
model = keras.Sequential()
model.add(layers.Dense(128, activation='relu', input_shape=(784,)))
model.add(layers.Dropout(0.2))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(10, activation='softmax'))
# Model summary
model.summary()
Functional API
# More flexible model building
inputs = keras.Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dropout(0.2)(x)
x = layers.Dense(64, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# Multi-input/output model
input_a = keras.Input(shape=(32,), name='input_a')
input_b = keras.Input(shape=(32,), name='input_b')
x = layers.Dense(16)(input_a)
y = layers.Dense(16)(input_b)
combined = layers.concatenate([x, y])
z = layers.Dense(8)(combined)
output_1 = layers.Dense(1, name='output_1')(z)
output_2 = layers.Dense(1, name='output_2')(combined)
model = keras.Model(
inputs=[input_a, input_b],
outputs=[output_1, output_2]
)
Model Subclassing
class MyModel(keras.Model):
def __init__(self, num_classes=10):
super(MyModel, self).__init__()
self.dense1 = layers.Dense(128, activation='relu')
self.dropout = layers.Dropout(0.2)
self.dense2 = layers.Dense(64, activation='relu')
self.classifier = layers.Dense(num_classes, activation='softmax')
def call(self, inputs, training=False):
x = self.dense1(inputs)
if training:
x = self.dropout(x)
x = self.dense2(x)
return self.classifier(x)
# Create model instance
model = MyModel(num_classes=10)
# Build model by calling it once
model(tf.zeros([1, 784]))
model.summary()
4. Building Neural Networks {#neural-networks}
Common Layers
# Dense (Fully Connected)
dense = layers.Dense(units=64, activation='relu',
kernel_initializer='glorot_uniform',
bias_initializer='zeros')
# Convolutional layers
conv2d = layers.Conv2D(filters=32, kernel_size=(3, 3),
activation='relu', padding='same')
conv1d = layers.Conv1D(filters=32, kernel_size=3, activation='relu')
# Pooling layers
maxpool = layers.MaxPooling2D(pool_size=(2, 2))
avgpool = layers.AveragePooling2D(pool_size=(2, 2))
globalpool = layers.GlobalAveragePooling2D()
# Recurrent layers
lstm = layers.LSTM(units=128, return_sequences=True)
gru = layers.GRU(units=64)
bidirectional = layers.Bidirectional(layers.LSTM(64))
# Normalization layers
batchnorm = layers.BatchNormalization()
layernorm = layers.LayerNormalization()
# Regularization layers
dropout = layers.Dropout(rate=0.5)
l1_reg = keras.regularizers.L1(0.01)
l2_reg = keras.regularizers.L2(0.01)
Custom Layers
class CustomDense(layers.Layer):
def __init__(self, units=32):
super(CustomDense, self).__init__()
self.units = units
def build(self, input_shape):
self.w = self.add_weight(
shape=(input_shape[-1], self.units),
initializer='random_normal',
trainable=True,
name='kernel'
)
self.b = self.add_weight(
shape=(self.units,),
initializer='zeros',
trainable=True,
name='bias'
)
def call(self, inputs):
return tf.matmul(inputs, self.w) + self.b
# Use custom layer
model = keras.Sequential([
CustomDense(64),
layers.ReLU(),
CustomDense(10)
])
5. Training & Evaluation {#training}
Compiling Models
# Classification
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Regression
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mse',
metrics=['mae']
)
# Binary classification
model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy', keras.metrics.Precision(), keras.metrics.Recall()]
)
Training
# Load MNIST dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
x_test = x_test.reshape(-1, 784).astype('float32') / 255.0
# Simple training
history = model.fit(
x_train, y_train,
batch_size=32,
epochs=10,
validation_split=0.1,
verbose=1
)
# With validation data
history = model.fit(
x_train, y_train,
batch_size=32,
epochs=10,
validation_data=(x_test, y_test)
)
# Plot training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='validation')
plt.title('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='validation')
plt.title('Accuracy')
plt.legend()
plt.show()
Callbacks
# Early stopping
early_stop = keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=5,
restore_best_weights=True
)
# Model checkpoint
checkpoint = keras.callbacks.ModelCheckpoint(
'best_model.h5',
monitor='val_accuracy',
save_best_only=True,
mode='max'
)
# Learning rate scheduler
def scheduler(epoch, lr):
if epoch < 10:
return lr
else:
return lr * tf.math.exp(-0.1)
lr_scheduler = keras.callbacks.LearningRateScheduler(scheduler)
# TensorBoard
tensorboard = keras.callbacks.TensorBoard(
log_dir='./logs',
histogram_freq=1
)
# Custom callback
class CustomCallback(keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
if logs.get('accuracy') > 0.95:
print(f"\nReached 95% accuracy at epoch {epoch}")
self.model.stop_training = True
# Training with callbacks
history = model.fit(
x_train, y_train,
epochs=50,
batch_size=32,
validation_split=0.1,
callbacks=[early_stop, checkpoint, lr_scheduler, tensorboard]
)
Data Pipeline with tf.data
# Create dataset
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Apply transformations
BATCH_SIZE = 32
AUTOTUNE = tf.data.AUTOTUNE
train_dataset = train_dataset\
.cache()\
.shuffle(buffer_size=1000)\
.batch(BATCH_SIZE)\
.prefetch(AUTOTUNE)
# Data augmentation
data_augmentation = keras.Sequential([
layers.RandomFlip("horizontal"),
layers.RandomRotation(0.1),
layers.RandomZoom(0.1),
])
def preprocess(image, label):
image = tf.cast(image, tf.float32) / 255.0
image = data_augmentation(image)
return image, label
augmented_dataset = train_dataset.map(preprocess, num_parallel_calls=AUTOTUNE)
6. CNN Example {#cnn}
Building a CNN
def create_cnn_model():
model = keras.Sequential([
# Conv Block 1
layers.Conv2D(32, (3, 3), activation='relu',
input_shape=(28, 28, 1)),
layers.BatchNormalization(),
layers.Conv2D(32, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# Conv Block 2
layers.Conv2D(64, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.BatchNormalization(),
layers.MaxPooling2D((2, 2)),
layers.Dropout(0.25),
# Dense layers
layers.Flatten(),
layers.Dense(128, activation='relu'),
layers.BatchNormalization(),
layers.Dropout(0.5),
layers.Dense(10, activation='softmax')
])
return model
# Create and compile model
cnn_model = create_cnn_model()
cnn_model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Prepare data
x_train_cnn = x_train.reshape(-1, 28, 28, 1)
x_test_cnn = x_test.reshape(-1, 28, 28, 1)
# Train
history = cnn_model.fit(
x_train_cnn, y_train,
batch_size=128,
epochs=10,
validation_data=(x_test_cnn, y_test)
)
Advanced CNN with Residual Connections
def residual_block(x, filters, kernel_size=3, stride=1):
"""Residual block with skip connection"""
shortcut = x
# First conv
x = layers.Conv2D(filters, kernel_size, strides=stride,
padding='same')(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
# Second conv
x = layers.Conv2D(filters, kernel_size, padding='same')(x)
x = layers.BatchNormalization()(x)
# Adjust shortcut if needed
if stride != 1 or shortcut.shape[-1] != filters:
shortcut = layers.Conv2D(filters, 1, strides=stride)(shortcut)
shortcut = layers.BatchNormalization()(shortcut)
# Add shortcut
x = layers.Add()([x, shortcut])
x = layers.ReLU()(x)
return x
# Build ResNet-like model
inputs = keras.Input(shape=(32, 32, 3))
x = layers.Conv2D(64, 7, strides=2, padding='same')(inputs)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.MaxPooling2D(3, strides=2, padding='same')(x)
# Residual blocks
x = residual_block(x, 64)
x = residual_block(x, 64)
x = residual_block(x, 128, stride=2)
x = residual_block(x, 128)
x = residual_block(x, 256, stride=2)
x = residual_block(x, 256)
# Classification head
x = layers.GlobalAveragePooling2D()(x)
outputs = layers.Dense(10, activation='softmax')(x)
resnet_model = keras.Model(inputs, outputs)
7. RNN/LSTM Example {#rnn}
Simple LSTM for Text Classification
# Text preprocessing
max_features = 10000
maxlen = 200
# Build LSTM model
lstm_model = keras.Sequential([
layers.Embedding(max_features, 128, input_length=maxlen),
layers.LSTM(64, dropout=0.5, recurrent_dropout=0.5),
layers.Dense(1, activation='sigmoid')
])
lstm_model.compile(
optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy']
)
# Example with IMDB dataset
(x_train, y_train), (x_test, y_test) = keras.datasets.imdb.load_data(
num_words=max_features
)
# Pad sequences
x_train = keras.preprocessing.sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = keras.preprocessing.sequence.pad_sequences(x_test, maxlen=maxlen)
# Train
lstm_model.fit(
x_train, y_train,
batch_size=32,
epochs=5,
validation_data=(x_test, y_test)
)
Bidirectional LSTM with Attention
class Attention(layers.Layer):
def __init__(self):
super(Attention, self).__init__()
def build(self, input_shape):
self.W = self.add_weight(
shape=(input_shape[-1], input_shape[-1]),
initializer='random_normal',
trainable=True
)
self.b = self.add_weight(
shape=(input_shape[-1],),
initializer='zeros',
trainable=True
)
def call(self, x):
# x shape: (batch, time, features)
e = tf.nn.tanh(tf.matmul(x, self.W) + self.b)
e = tf.reduce_sum(e, axis=-1) # (batch, time)
alpha = tf.nn.softmax(e) # Attention weights
alpha = tf.expand_dims(alpha, -1)
context = tf.reduce_sum(x * alpha, axis=1)
return context
# Build model with attention
inputs = keras.Input(shape=(maxlen,))
x = layers.Embedding(max_features, 128)(inputs)
x = layers.Bidirectional(layers.LSTM(64, return_sequences=True))(x)
x = Attention()(x)
x = layers.Dense(64, activation='relu')(x)
x = layers.Dropout(0.5)(x)
outputs = layers.Dense(1, activation='sigmoid')(x)
attention_model = keras.Model(inputs, outputs)
8. Custom Training Loops {#custom-training}
GradientTape for Custom Training
# Create simple model
model = keras.Sequential([
layers.Dense(128, activation='relu'),
layers.Dense(10)
])
# Loss and optimizer
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = keras.optimizers.Adam()
# Metrics
train_loss = keras.metrics.Mean(name='train_loss')
train_accuracy = keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
predictions = model(x, training=True)
loss = loss_fn(y, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss(loss)
train_accuracy(y, predictions)
# Training loop
EPOCHS = 5
for epoch in range(EPOCHS):
# Reset metrics
train_loss.reset_states()
train_accuracy.reset_states()
for x_batch, y_batch in train_dataset:
train_step(x_batch, y_batch)
print(f'Epoch {epoch + 1}, '
f'Loss: {train_loss.result():.4f}, '
f'Accuracy: {train_accuracy.result():.4f}')
Custom Loss Functions
# Custom MSE with penalty
def custom_mse_with_penalty(y_true, y_pred, penalty_weight=0.1):
mse = tf.reduce_mean(tf.square(y_true - y_pred))
penalty = penalty_weight * tf.reduce_mean(tf.abs(y_pred))
return mse + penalty
# Focal loss for imbalanced classification
def focal_loss(gamma=2., alpha=0.25):
def focal_loss_fixed(y_true, y_pred):
epsilon = tf.keras.backend.epsilon()
y_pred = tf.clip_by_value(y_pred, epsilon, 1. - epsilon)
# Calculate focal loss
p_t = tf.where(tf.equal(y_true, 1), y_pred, 1 - y_pred)
alpha_factor = tf.ones_like(y_true) * alpha
alpha_t = tf.where(tf.equal(y_true, 1), alpha_factor, 1 - alpha_factor)
cross_entropy = -tf.math.log(p_t)
weight = alpha_t * tf.pow((1 - p_t), gamma)
loss = weight * cross_entropy
return tf.reduce_mean(loss)
return focal_loss_fixed
9. TensorFlow Advanced Features {#advanced}
Mixed Precision Training
# Enable mixed precision
policy = tf.keras.mixed_precision.Policy('mixed_float16')
tf.keras.mixed_precision.set_global_policy(policy)
# Build model with mixed precision
inputs = keras.Input(shape=(784,))
x = layers.Dense(128, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)
# Output layer should use float32
outputs = layers.Dense(10, activation='softmax', dtype='float32')(x)
model = keras.Model(inputs, outputs)
# Use loss scaling
optimizer = keras.optimizers.Adam()
optimizer = tf.keras.mixed_precision.LossScaleOptimizer(optimizer)
Distributed Training
# Multi-GPU training with MirroredStrategy
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
# Build model inside strategy scope
model = create_model()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
# Adjust batch size for number of replicas
BATCH_SIZE = 64 * strategy.num_replicas_in_sync
# Train normally
model.fit(train_dataset, epochs=10)
TensorFlow Functions and Graph Optimization
# tf.function decorator for graph optimization
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
predictions = model(x, training=True)
loss = loss_fn(y, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Trace function execution
@tf.function
def traced_function(x):
return x * x + 2 * x + 1
# View generated graph
print(traced_function.get_concrete_function(tf.constant(5.0)).graph.as_graph_def())
10. Deployment & Production {#deployment}
Model Saving and Loading
# Save entire model
model.save('my_model.h5') # Keras H5 format
model.save('my_model') # SavedModel format (recommended)
# Load model
loaded_model = keras.models.load_model('my_model')
# Save only weights
model.save_weights('my_weights.h5')
# Load weights
model.load_weights('my_weights.h5')
# Export to SavedModel with signatures
class ExportModel(tf.Module):
def __init__(self, model):
self.model = model
@tf.function
def predict(self, x):
return self.model(x)
@tf.function(input_signature=[tf.TensorSpec(shape=[None, 784], dtype=tf.float32)])
def serve(self, x):
return {'predictions': self.predict(x)}
export_model = ExportModel(model)
tf.saved_model.save(export_model, 'exported_model',
signatures={'serving_default': export_model.serve})
TensorFlow Lite Conversion
# Convert to TFLite
converter = tf.lite.TFLiteConverter.from_saved_model('my_model')
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
# Save TFLite model
with open('model.tflite', 'wb') as f:
f.write(tflite_model)
# Quantization for smaller model size
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.representative_dataset = lambda: representative_dataset_gen()
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
converter.inference_input_type = tf.uint8
converter.inference_output_type = tf.uint8
TensorFlow Serving
# Model versioning structure
# models/
# my_model/
# 1/
# saved_model.pb
# variables/
# 2/
# saved_model.pb
# variables/
# Start TensorFlow Serving (Docker)
# docker run -p 8501:8501 --mount type=bind,source=/path/to/models,target=/models tensorflow/serving
# Client code
import requests
import json
data = json.dumps({
"signature_name": "serving_default",
"instances": [[1.0, 2.0, 3.0, 4.0]]
})
headers = {"content-type": "application/json"}
response = requests.post(
'http://localhost:8501/v1/models/my_model:predict',
data=data,
headers=headers
)
predictions = response.json()['predictions']
11. Interview Questions {#interview-questions}
Basic Questions
Q1: What's the difference between tf.constant and tf.Variable?
# tf.constant: Immutable
constant = tf.constant([1, 2, 3])
# constant.assign([4, 5, 6]) # This will error
# tf.Variable: Mutable
variable = tf.Variable([1, 2, 3])
variable.assign([4, 5, 6]) # This works
Q2: Explain eager execution vs graph execution
# Eager execution (default in TF 2.x)
x = tf.constant(5)
y = tf.constant(3)
z = x + y # Executes immediately
print(z.numpy()) # 8
# Graph execution
@tf.function # Creates computation graph
def add(a, b):
return a + b
result = add(x, y) # Optimized execution
Q3: What are the different ways to create a model in Keras?
# 1. Sequential API
model = keras.Sequential([layers.Dense(10)])
# 2. Functional API
inputs = keras.Input(shape=(10,))
outputs = layers.Dense(10)(inputs)
model = keras.Model(inputs, outputs)
# 3. Model Subclassing
class MyModel(keras.Model):
def __init__(self):
super().__init__()
self.dense = layers.Dense(10)
def call(self, inputs):
return self.dense(inputs)
Intermediate Questions
Q4: Implement custom accuracy metric
class CustomAccuracy(keras.metrics.Metric):
def __init__(self, name='custom_accuracy', **kwargs):
super(CustomAccuracy, self).__init__(name=name, **kwargs)
self.correct = self.add_weight(name='correct', initializer='zeros')
self.total = self.add_weight(name='total', initializer='zeros')
def update_state(self, y_true, y_pred, sample_weight=None):
y_pred = tf.argmax(y_pred, axis=-1)
y_true = tf.cast(y_true, tf.int64)
correct = tf.cast(tf.equal(y_true, y_pred), tf.float32)
if sample_weight is not None:
correct = correct * sample_weight
self.correct.assign_add(tf.reduce_sum(correct))
self.total.assign_add(tf.cast(tf.shape(y_true)[0], tf.float32))
def result(self):
return self.correct / self.total
def reset_states(self):
self.correct.assign(0)
self.total.assign(0)
Q5: Explain and implement gradient clipping
# Method 1: Using optimizer parameter
optimizer = keras.optimizers.Adam(learning_rate=0.001, clipnorm=1.0)
# Method 2: Manual clipping in custom training
@tf.function
def train_step_with_clipping(x, y):
with tf.GradientTape() as tape:
predictions = model(x, training=True)
loss = loss_fn(y, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
# Clip gradients
clipped_gradients = [tf.clip_by_norm(g, 1.0) for g in gradients]
optimizer.apply_gradients(zip(clipped_gradients, model.trainable_variables))
return loss
Q6: How to handle imbalanced datasets?
# Method 1: Class weights
class_weights = {0: 1.0, 1: 10.0} # Higher weight for minority class
model.fit(x_train, y_train, class_weight=class_weights)
# Method 2: Oversampling
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
positive = train_dataset.filter(lambda x, y: tf.equal(y, 1))
negative = train_dataset.filter(lambda x, y: tf.equal(y, 0))
# Oversample minority class
positive = positive.repeat(10)
dataset = positive.concatenate(negative).shuffle(10000)
# Method 3: Custom loss with sample weights
def weighted_loss(y_true, y_pred):
weights = tf.where(tf.equal(y_true, 1), 10.0, 1.0)
loss = keras.losses.binary_crossentropy(y_true, y_pred)
return loss * weights
Advanced Questions
Q7: Implement attention mechanism from scratch
class MultiHeadAttention(layers.Layer):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
self.depth = d_model // num_heads
self.wq = layers.Dense(d_model)
self.wk = layers.Dense(d_model)
self.wv = layers.Dense(d_model)
self.dense = layers.Dense(d_model)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, query, key, value, mask=None):
batch_size = tf.shape(query)[0]
# Linear transformations
q = self.wq(query)
k = self.wk(key)
v = self.wv(value)
# Split heads
q = self.split_heads(q, batch_size)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
# Scaled dot-product attention
matmul_qk = tf.matmul(q, k, transpose_b=True)
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
if mask is not None:
scaled_attention_logits += (mask * -1e9)
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
output = tf.matmul(attention_weights, v)
# Concat heads
output = tf.transpose(output, perm=[0, 2, 1, 3])
output = tf.reshape(output, (batch_size, -1, self.d_model))
# Final linear layer
output = self.dense(output)
return output
Q8: Implement custom training with multiple optimizers
# Model with generator and discriminator
generator = create_generator()
discriminator = create_discriminator()
# Separate optimizers
gen_optimizer = keras.optimizers.Adam(learning_rate=0.0002)
disc_optimizer = keras.optimizers.Adam(learning_rate=0.0002)
@tf.function
def train_step(real_images):
noise = tf.random.normal([BATCH_SIZE, noise_dim])
with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
generated_images = generator(noise, training=True)
real_output = discriminator(real_images, training=True)
fake_output = discriminator(generated_images, training=True)
gen_loss = generator_loss(fake_output)
disc_loss = discriminator_loss(real_output, fake_output)
gen_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
disc_gradients = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
gen_optimizer.apply_gradients(zip(gen_gradients, generator.trainable_variables))
disc_optimizer.apply_gradients(zip(disc_gradients, discriminator.trainable_variables))
return gen_loss, disc_loss
Q9: Custom layer with multiple inputs/outputs
class CustomSplitLayer(layers.Layer):
def __init__(self, units_list):
super(CustomSplitLayer, self).__init__()
self.dense_layers = [layers.Dense(units) for units in units_list]
def call(self, inputs):
# Single input, multiple outputs
outputs = [dense(inputs) for dense in self.dense_layers]
return outputs
class CustomMergeLayer(layers.Layer):
def __init__(self, units):
super(CustomMergeLayer, self).__init__()
self.concat = layers.Concatenate()
self.dense = layers.Dense(units)
def call(self, inputs):
# Multiple inputs, single output
concatenated = self.concat(inputs)
return self.dense(concatenated)
# Usage
split_layer = CustomSplitLayer([64, 32, 16])
merge_layer = CustomMergeLayer(128)
# In functional API
inputs = keras.Input(shape=(100,))
outputs = split_layer(inputs)
merged = merge_layer(outputs)
model = keras.Model(inputs, merged)
Q10: Memory optimization techniques
# 1. Gradient accumulation for large batch sizes
accumulation_steps = 4
accumulated_gradients = [tf.Variable(tf.zeros_like(var))
for var in model.trainable_variables]
@tf.function
def train_step_accumulated(x, y, step):
with tf.GradientTape() as tape:
predictions = model(x, training=True)
loss = loss_fn(y, predictions) / accumulation_steps
gradients = tape.gradient(loss, model.trainable_variables)
# Accumulate gradients
for i, grad in enumerate(gradients):
accumulated_gradients[i].assign_add(grad)
# Apply gradients every accumulation_steps
if step % accumulation_steps == 0:
optimizer.apply_gradients(zip(accumulated_gradients,
model.trainable_variables))
# Reset accumulated gradients
for grad in accumulated_gradients:
grad.assign(tf.zeros_like(grad))
# 2. Mixed precision training (shown earlier)
# 3. Gradient checkpointing
@tf.custom_gradient
def checkpointed_dense(x, w, b):
y = tf.matmul(x, w) + b
def grad(dy):
# Recompute forward pass during backprop
dx = tf.matmul(dy, tf.transpose(w))
dw = tf.matmul(tf.transpose(x), dy)
db = tf.reduce_sum(dy, axis=0)
return dx, dw, db
return y, grad
Performance & Debugging Tips
1. Profile your model:
# Enable profiler
tf.profiler.experimental.start('logdir')
# Your training code
for epoch in range(5):
model.fit(x_train, y_train, epochs=1)
tf.profiler.experimental.stop()
2. Debug tensor shapes:
# Print tensor info in graph mode
@tf.function
def debug_shapes(x):
tf.print("Input shape:", tf.shape(x))
x = layers.Dense(64)(x)
tf.print("After dense:", tf.shape(x))
return x
3. Memory management:
# Clear session
tf.keras.backend.clear_session()
# Limit GPU memory growth
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
This guide covers TensorFlow 2.x from basics to advanced topics. Key differences from PyTorch include the default eager execution, the Keras high-level API, and better production deployment tools. Practice these examples and understand the concepts for interviews!
Comments
Post a Comment