Create efficient data pipelines with tf.data
Build efficient, scalable data pipelines using TensorFlow's tf.data API for optimal training performance. This skill covers dataset creation from arrays, generators, and files, plus advanced transformations like batching, shuffling, caching, prefetching, and data augmentation. Use when you need to load large datasets, implement real-time augmentation, optimize GPU/TPU utilization, or parse TFRecord/CSV/image files for model training.
/plugin marketplace add TheBushidoCollective/han/plugin install jutsu-shellcheck@hanThis skill is limited to using the following tools:
Build efficient, scalable data pipelines using the tf.data API for optimal training performance. This skill covers dataset creation, transformations, batching, shuffling, prefetching, and advanced optimization techniques to maximize GPU/TPU utilization.
import tensorflow as tf
import numpy as np
# Create dataset from numpy arrays
x_train = np.random.rand(1000, 28, 28, 1)
y_train = np.random.randint(0, 10, 1000)
# Method 1: from_tensor_slices
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Apply transformations
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
# Iterate through dataset
for batch_x, batch_y in dataset.take(2):
print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")
def data_generator():
"""Generator function for custom data loading."""
for i in range(1000):
# Simulate loading data from disk or API
x = np.random.rand(28, 28, 1).astype(np.float32)
y = np.random.randint(0, 10)
yield x, y
# Create dataset from generator
dataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
# Create simple range dataset
dataset = tf.data.Dataset.range(1000)
# Use with custom mapping
dataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10))
dataset = dataset.batch(32)
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Apply normalization
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
def augment(image, label):
"""Apply random augmentations."""
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
return image, label
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Build complete pipeline
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after normalization
.shuffle(1000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
def resize_image(image, label):
"""Resize images to target size."""
image = tf.image.resize(image, [224, 224])
return image, label
def apply_random_rotation(image, label):
"""Apply random rotation augmentation."""
angle = tf.random.uniform([], -0.2, 0.2)
image = tfa.image.rotate(image, angle)
return image, label
# Chain multiple transformations
dataset = (
tf.data.Dataset.from_tensor_slices((images, labels))
.map(resize_image, num_parallel_calls=tf.data.AUTOTUNE)
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
# Batch size
BATCH_SIZE = 64
# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
# Variable batch sizes based on sequence length
def batch_by_sequence_length(dataset, batch_size, max_length):
"""Batch sequences by length for efficient padding."""
def key_func(x, y):
# Bucket by length
return tf.cast(tf.size(x) / max_length * 10, tf.int64)
def reduce_func(key, dataset):
return dataset.batch(batch_size)
return dataset.group_by_window(
key_func=key_func,
reduce_func=reduce_func,
window_size=batch_size
)
def create_stratified_dataset(features, labels, batch_size):
"""Create dataset with balanced class sampling."""
# Separate by class
datasets = []
for class_id in range(num_classes):
mask = labels == class_id
class_dataset = tf.data.Dataset.from_tensor_slices(
(features[mask], labels[mask])
)
datasets.append(class_dataset)
# Sample equally from each class
balanced_dataset = tf.data.Dataset.sample_from_datasets(
datasets,
weights=[1.0/num_classes] * num_classes
)
return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
# Cache in memory (for small datasets)
dataset = dataset.cache()
# Cache to disk (for larger datasets)
dataset = dataset.cache('/tmp/dataset_cache')
# Optimal caching placement
dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after expensive operations
.shuffle(buffer_size)
.map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
# Automatic prefetching
dataset = dataset.prefetch(tf.data.AUTOTUNE)
# Manual prefetch buffer size
dataset = dataset.prefetch(buffer_size=2)
# Complete optimized pipeline
optimized_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
# Use num_parallel_calls for CPU-bound operations
dataset = dataset.map(
preprocessing_function,
num_parallel_calls=tf.data.AUTOTUNE
)
# Interleave for parallel file reading
def make_dataset_from_file(filename):
return tf.data.TextLineDataset(filename)
filenames = tf.data.Dataset.list_files('/path/to/data/*.csv')
dataset = filenames.interleave(
make_dataset_from_file,
cycle_length=4,
num_parallel_calls=tf.data.AUTOTUNE
)
# Use take() and skip() for train/val split without loading all data
total_size = 10000
train_size = int(0.8 * total_size)
full_dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_dataset = (
full_dataset
.take(train_size)
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
full_dataset
.skip(train_size)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Basic iteration
for i in tf.data.Dataset.range(3):
tf.print('iteration:', i)
# With dataset iterator
for i in iter(tf.data.Dataset.range(3)):
tf.print('iteration:', i)
# Distribute dataset across devices
for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset(
tf.data.Dataset.range(3)):
tf.print('iteration:', i)
# Multi-GPU distribution
strategy = tf.distribute.MirroredStrategy()
distributed_dataset = strategy.experimental_distribute_dataset(dataset)
# Execute training loop over dataset
for images, labels in train_ds:
if optimizer.iterations > TRAIN_STEPS:
break
train_step(images, labels)
def f(args):
embeddings, index = args
# embeddings [vocab_size, embedding_dim]
# index []
# desired result: [embedding_dim]
return tf.gather(params=embeddings, indices=index)
@tf.function
def f_auto_vectorized(embeddings, indices):
# embeddings [num_heads, vocab_size, embedding_dim]
# indices [num_heads]
# desired result: [num_heads, embedding_dim]
return tf.vectorized_map(f, [embeddings, indices])
concrete_vectorized = f_auto_vectorized.get_concrete_function(
tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32),
tf.TensorSpec(shape=[None], dtype=tf.int32))
# Use dataset with model
model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(train_dataset, epochs=1)
# Create separate train and validation datasets
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
tf.data.Dataset.from_tensor_slices((x_val, y_val))
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Train with validation
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10
)
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Training loop with dataset
for epoch in range(epochs):
for images, labels in train_dataset:
loss = train_step(images, labels)
print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')
# Reading TFRecord files
def parse_tfrecord(example_proto):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
parsed = tf.io.parse_single_example(example_proto, feature_description)
image = tf.io.decode_raw(parsed['image'], tf.float32)
image = tf.reshape(image, [28, 28, 1])
label = parsed['label']
return image, label
# Load TFRecord dataset
tfrecord_dataset = (
tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord'])
.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Load CSV dataset
def parse_csv(line):
columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785)
label = tf.cast(columns[0], tf.int32)
features = tf.stack(columns[1:])
features = tf.reshape(features, [28, 28, 1])
return features, label
csv_dataset = (
tf.data.TextLineDataset(['data.csv'])
.skip(1) # Skip header
.map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
def load_and_preprocess_image(path, label):
"""Load image from file and preprocess."""
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Create dataset from image paths
image_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...]
labels = [0, 1, ...]
image_dataset = (
tf.data.Dataset.from_tensor_slices((image_paths, labels))
.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Generate TensorFlow dataset with batching
def gen_dataset(
batch_size=1,
is_training=False,
shuffle=False,
input_pipeline_context=None,
preprocess=None,
drop_remainder=True,
total_steps=None
):
"""Generate dataset with specified configuration."""
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
if preprocess:
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)
if is_training:
dataset = dataset.repeat()
dataset = dataset.prefetch(tf.data.AUTOTUNE)
if total_steps:
dataset = dataset.take(total_steps)
return dataset
Use the tensorflow-data-pipelines skill when you need to: