Master Python fundamentals, OOP, data structures, async programming, and production-grade scripting for data engineering
Production-grade Python for data engineering: write type-safe, memory-efficient code with generators, async I/O, and modern tooling. Trigger when building data pipelines, ETL systems, or optimizing Python performance.
/plugin marketplace add pluginagentmarketplace/custom-plugin-data-engineer/plugin install data-engineer-development-assistant@pluginagentmarketplace-data-engineerThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/config.yamlassets/schema.jsonreferences/GUIDE.mdreferences/PATTERNS.mdscripts/validate.pyProduction-grade Python development for building scalable data pipelines, ETL systems, and data-intensive applications.
# Modern Python 3.12+ data engineering setup
from dataclasses import dataclass
from typing import Generator
from collections.abc import Iterator
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class DataRecord:
"""Type-safe data container with validation."""
id: int
value: float
category: str
def __post_init__(self):
if self.value < 0:
raise ValueError(f"Value must be non-negative, got {self.value}")
def process_records(records: Iterator[dict]) -> Generator[DataRecord, None, None]:
"""Memory-efficient generator for processing large datasets."""
for idx, record in enumerate(records):
try:
yield DataRecord(
id=record['id'],
value=float(record['value']),
category=record.get('category', 'unknown')
)
except (KeyError, ValueError) as e:
logger.warning(f"Skipping invalid record {idx}: {e}")
continue
# Usage
if __name__ == "__main__":
sample_data = [{"id": 1, "value": "100.5", "category": "A"}]
for record in process_records(iter(sample_data)):
logger.info(f"Processed: {record}")
from typing import TypedDict, NotRequired, Literal
from dataclasses import dataclass, field
from datetime import datetime
# TypedDict for JSON-like structures
class PipelineConfig(TypedDict):
source: str
destination: str
batch_size: int
retry_count: NotRequired[int]
mode: Literal["batch", "streaming"]
# Dataclass for domain objects
@dataclass(frozen=True, slots=True)
class ETLJob:
"""Immutable, memory-efficient job definition."""
job_id: str
created_at: datetime = field(default_factory=datetime.utcnow)
config: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {"job_id": self.job_id, "created_at": self.created_at.isoformat()}
from typing import Generator, Iterable
import csv
from pathlib import Path
def read_csv_chunks(
file_path: Path,
chunk_size: int = 10000
) -> Generator[list[dict], None, None]:
"""
Memory-efficient CSV reader using generators.
Processes files of any size without loading into memory.
"""
with open(file_path, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
chunk = []
for row in reader:
chunk.append(row)
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk: # Don't forget the last chunk
yield chunk
def transform_pipeline(
records: Iterable[dict],
transformers: list[callable]
) -> Generator[dict, None, None]:
"""Composable transformation pipeline."""
for record in records:
result = record
for transform in transformers:
result = transform(result)
if result is None:
break
if result is not None:
yield result
import asyncio
import aiohttp
from typing import AsyncGenerator
import logging
logger = logging.getLogger(__name__)
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3,
backoff_factor: float = 2.0
) -> dict | None:
"""
Fetch URL with exponential backoff retry logic.
Production pattern for API data ingestion.
"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
return await resp.json()
except aiohttp.ClientError as e:
wait_time = backoff_factor ** attempt
logger.warning(f"Attempt {attempt+1} failed for {url}: {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
logger.error(f"All retries exhausted for {url}")
return None
async def fetch_all_pages(
base_url: str,
page_count: int,
concurrency_limit: int = 10
) -> AsyncGenerator[dict, None]:
"""Concurrent API fetching with rate limiting."""
semaphore = asyncio.Semaphore(concurrency_limit)
async def bounded_fetch(session: aiohttp.ClientSession, url: str):
async with semaphore:
return await fetch_with_retry(session, url)
async with aiohttp.ClientSession() as session:
tasks = [bounded_fetch(session, f"{base_url}?page={i}") for i in range(page_count)]
for result in asyncio.as_completed(tasks):
data = await result
if data:
yield data
import functools
import time
import logging
from typing import TypeVar, Callable, ParamSpec
P = ParamSpec('P')
R = TypeVar('R')
def with_retry(
max_attempts: int = 3,
exceptions: tuple = (Exception,),
backoff_factor: float = 2.0
) -> Callable[[Callable[P, R]], Callable[P, R]]:
"""
Decorator for automatic retry with exponential backoff.
Use for flaky operations (network, database connections).
"""
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
wait_time = backoff_factor ** attempt
logging.warning(
f"{func.__name__} attempt {attempt+1} failed: {e}. "
f"Retrying in {wait_time}s"
)
time.sleep(wait_time)
raise last_exception
return wrapper
return decorator
def log_execution_time(func: Callable[P, R]) -> Callable[P, R]:
"""Decorator for performance monitoring."""
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
start = time.perf_counter()
try:
result = func(*args, **kwargs)
duration = time.perf_counter() - start
logging.info(f"{func.__name__} completed in {duration:.3f}s")
return result
except Exception as e:
duration = time.perf_counter() - start
logging.error(f"{func.__name__} failed after {duration:.3f}s: {e}")
raise
return wrapper
| Tool | Purpose | Version (2025) |
|---|---|---|
| Python | Core language | 3.12+ |
| uv | Package manager (replaces pip) | 0.4+ |
| Ruff | Linter + formatter (replaces Black, flake8) | 0.5+ |
| mypy | Static type checking | 1.11+ |
| pytest | Testing framework | 8.0+ |
| pydantic | Data validation | 2.5+ |
| polars | DataFrame operations (faster than pandas) | 0.20+ |
| httpx | Modern HTTP client | 0.27+ |
Week 1: Core syntax, data types, control flow
Week 2: Functions, modules, file I/O
Week 3: OOP (classes, inheritance, composition)
Week 4: Generators, iterators, decorators
Week 5: Type hints, dataclasses, protocols
Week 6: Error handling, logging, testing basics
Week 7: Async/await, concurrent programming
Week 8: Memory optimization, profiling
Week 9: Package structure, dependency management
Week 10: CI/CD integration, linting, formatting
Week 11: Performance optimization patterns
Week 12: Production deployment patterns
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Type-safe configuration with environment variable support."""
database_url: str
api_key: str
batch_size: int = 1000
debug: bool = False
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
@lru_cache
def get_settings() -> Settings:
"""Cached settings singleton."""
return Settings()
from contextlib import contextmanager
from typing import Generator
import psycopg2
from psycopg2 import pool
class DatabasePool:
"""Thread-safe connection pool for PostgreSQL."""
def __init__(self, dsn: str, min_conn: int = 2, max_conn: int = 10):
self._pool = pool.ThreadedConnectionPool(min_conn, max_conn, dsn)
@contextmanager
def get_connection(self) -> Generator:
conn = self._pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._pool.putconn(conn)
def close(self):
self._pool.closeall()
| Issue | Symptoms | Root Cause | Fix |
|---|---|---|---|
| Memory Error | MemoryError, process killed | Loading full dataset into memory | Use generators, chunked processing |
| Import Error | ModuleNotFoundError | Virtual env not activated, missing dep | uv pip install, check sys.path |
| Type Error | TypeError: unhashable type | Using mutable as dict key | Convert to tuple or use dataclass |
| Async Deadlock | Program hangs | Blocking call in async code | Use asyncio.to_thread() for blocking ops |
| GIL Bottleneck | CPU-bound parallelism slow | Python GIL limits threads | Use multiprocessing or ProcessPoolExecutor |
# 1. Check Python version
python --version # Should be 3.12+
# 2. Verify virtual environment
which python # Should point to venv
# 3. Check installed packages
uv pip list | grep <package>
# 4. Run with verbose logging
python -m mymodule -v 2>&1 | tee debug.log
# 5. Profile memory usage
python -m memory_profiler script.py
# 6. Profile CPU
python -m cProfile -s cumtime script.py
# Structured logging for easier debugging
import structlog
logger = structlog.get_logger()
def process_batch(batch_id: str, records: list):
logger.info("batch_started", batch_id=batch_id, record_count=len(records))
try:
# processing...
logger.info("batch_completed", batch_id=batch_id, success=True)
except Exception as e:
logger.error("batch_failed", batch_id=batch_id, error=str(e), exc_info=True)
raise
import pytest
from unittest.mock import Mock, patch
from your_module import process_records, DataRecord
class TestProcessRecords:
"""Unit tests following AAA pattern (Arrange-Act-Assert)."""
def test_valid_records_processed(self):
# Arrange
input_data = [{"id": 1, "value": "10.5", "category": "A"}]
# Act
result = list(process_records(iter(input_data)))
# Assert
assert len(result) == 1
assert result[0].id == 1
assert result[0].value == 10.5
def test_invalid_records_skipped(self):
# Arrange
input_data = [{"id": 1}] # Missing 'value'
# Act
result = list(process_records(iter(input_data)))
# Assert
assert len(result) == 0
def test_negative_value_raises_error(self):
# Arrange & Act & Assert
with pytest.raises(ValueError, match="non-negative"):
DataRecord(id=1, value=-5.0, category="A")
@patch('your_module.external_api_call')
def test_with_mocked_dependency(self, mock_api):
# Arrange
mock_api.return_value = {"status": "ok"}
# Act
result = function_using_api()
# Assert
mock_api.assert_called_once()
assert result["status"] == "ok"
# ✅ DO: Use type hints everywhere
def calculate_metrics(data: list[float]) -> dict[str, float]: ...
# ✅ DO: Prefer composition over inheritance
@dataclass
class Pipeline:
reader: DataReader
transformer: Transformer
writer: DataWriter
# ✅ DO: Use context managers for resources
with open_connection() as conn:
process(conn)
# ❌ DON'T: Use bare except
try: ...
except: pass # Never do this
# ❌ DON'T: Mutate function arguments
def process(items: list) -> list:
items.append("new") # Avoid this
return items.copy() # Return new list instead
# ✅ Use generators for large data
def process_large_file(path):
with open(path) as f:
for line in f: # Memory efficient
yield transform(line)
# ✅ Use set/dict for O(1) lookups
valid_ids = set(load_valid_ids()) # Not list
if item_id in valid_ids: ...
# ✅ Use local variables in hot loops
def hot_loop(items):
local_func = expensive_lookup # Cache reference
for item in items:
local_func(item)
After mastering Python programming:
sql-databases - Query and manage relational dataetl-tools - Build data pipelines with Airflowbig-data - Scale with Spark and distributed systemsmachine-learning - Apply ML with scikit-learnSkill Certification Checklist:
Use when working with Payload CMS projects (payload.config.ts, collections, fields, hooks, access control, Payload API). Use when debugging validation errors, security issues, relationship queries, transactions, or hook behavior.