From majestic-data
Implements Parquet patterns in Python: partitioning, predicate pushdown, schema evolution, efficient reads/writes with PyArrow and pandas for large datasets.
npx claudepluginhub majesticlabs-dev/majestic-marketplace --plugin majestic-dataThis skill is limited to using the following tools:
Patterns for efficient columnar data storage with Parquet.
Optimizes pandas DataFrames for large datasets via chunked reading, dtype optimization, category types, vectorized operations, groupby patterns, and memory reduction.
Guides Polars DataFrame usage in Python/Rust: expressions, lazy evaluation, select/filter/group/join ops, CSV/Parquet I/O for high-performance data analysis workflows.
Converts data files between formats like CSV, Parquet, JSON, Excel, GeoJSON using DuckDB. Supports remote inputs and binary outputs Claude cannot generate natively.
Share bugs, ideas, or general feedback.
Patterns for efficient columnar data storage with Parquet.
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
# Write with compression
df.to_parquet('data.parquet', compression='snappy', index=False)
# Common compression options:
# - snappy: Fast, good compression (default)
# - gzip: Slower, better compression
# - zstd: Best balance of speed/compression
# - None: No compression (fastest writes)
# Read entire file
df = pd.read_parquet('data.parquet')
# Read specific columns only (predicate pushdown)
df = pd.read_parquet('data.parquet', columns=['id', 'name', 'value'])
# Read as PyArrow Table (more memory efficient)
table = pq.read_table('data.parquet')
# Convert to pandas when needed
df = table.to_pandas()
# Filter while reading (row group filtering)
table = pq.read_table(
'data.parquet',
filters=[
('date', '>=', '2024-01-01'),
('status', '=', 'active')
]
)
# Read in batches for huge files
parquet_file = pq.ParquetFile('huge.parquet')
for batch in parquet_file.iter_batches(batch_size=100_000):
df_batch = batch.to_pandas()
process(df_batch)
# Write partitioned by columns
df.to_parquet(
'data/',
partition_cols=['year', 'month'],
compression='snappy'
)
# Creates: data/year=2024/month=01/part-0.parquet
# Read partitioned dataset
df = pd.read_parquet('data/') # Reads all partitions
# Read specific partitions only
df = pd.read_parquet('data/year=2024/')
# With PyArrow dataset API (more control)
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet', partitioning='hive')
# Filter on partition columns (very fast)
table = dataset.to_table(
filter=(ds.field('year') == 2024) & (ds.field('month') >= 6)
)
# Explicit schema for consistency
schema = pa.schema([
('id', pa.int64()),
('name', pa.string()),
('value', pa.float64()),
('date', pa.date32()),
('tags', pa.list_(pa.string())),
('metadata', pa.map_(pa.string(), pa.string())),
])
# Write with schema
table = pa.Table.from_pandas(df, schema=schema)
pq.write_table(table, 'data.parquet')
# Read and validate schema
file_schema = pq.read_schema('data.parquet')
assert file_schema.equals(schema), "Schema mismatch!"
def merge_schemas(old_schema: pa.Schema, new_schema: pa.Schema) -> pa.Schema:
"""Create unified schema from old and new."""
fields = {f.name: f for f in old_schema}
for field in new_schema:
if field.name not in fields:
fields[field.name] = field
elif fields[field.name].type != field.type:
# Handle type conflicts (e.g., promote int to float)
fields[field.name] = pa.field(
field.name,
promote_type(fields[field.name].type, field.type)
)
return pa.schema(list(fields.values()))
def append_with_schema_evolution(
existing_path: str,
new_df: pd.DataFrame,
output_path: str
) -> None:
"""Append data with automatic schema evolution."""
existing = pq.read_table(existing_path)
new_table = pa.Table.from_pandas(new_df)
# Unify schemas
unified_schema = pa.unify_schemas([existing.schema, new_table.schema])
# Cast both to unified schema
existing = existing.cast(unified_schema)
new_table = new_table.cast(unified_schema)
# Concatenate and write
combined = pa.concat_tables([existing, new_table])
pq.write_table(combined, output_path)
# Control row group size (affects read performance)
pq.write_table(
table,
'data.parquet',
row_group_size=100_000, # Rows per group
compression='snappy'
)
# Read metadata to see row groups
parquet_file = pq.ParquetFile('data.parquet')
print(f"Num row groups: {parquet_file.metadata.num_row_groups}")
print(f"Num rows: {parquet_file.metadata.num_rows}")
# Read specific row groups
table = parquet_file.read_row_groups([0, 1]) # First two groups
# Add custom metadata
custom_metadata = {
b'created_by': b'etl_pipeline',
b'version': b'1.0',
b'source': b'api_export'
}
schema = table.schema.with_metadata(custom_metadata)
table = table.cast(schema)
pq.write_table(table, 'data.parquet')
# Read metadata
parquet_file = pq.ParquetFile('data.parquet')
print(parquet_file.schema.metadata)
# Column statistics (min/max for filtering)
metadata = parquet_file.metadata
for i in range(metadata.num_row_groups):
rg = metadata.row_group(i)
for j in range(rg.num_columns):
col = rg.column(j)
if col.statistics:
print(f"{col.path_in_schema}: min={col.statistics.min}, max={col.statistics.max}")
from deltalake import DeltaTable, write_deltalake
# Write as Delta table (versioned parquet)
write_deltalake('delta_table/', df, mode='overwrite')
# Append data
write_deltalake('delta_table/', new_df, mode='append')
# Read Delta table
dt = DeltaTable('delta_table/')
df = dt.to_pandas()
# Time travel
df_old = dt.load_version(0).to_pandas()
# Compact small files
dt.optimize.compact()
# Vacuum old versions
dt.vacuum(retention_hours=168) # Keep 7 days
# 1. Use appropriate column types
schema = pa.schema([
('category', pa.dictionary(pa.int8(), pa.string())), # For repeated strings
('count', pa.int32()), # Not int64 if values fit
])
# 2. Sort data before writing (improves predicate pushdown)
df = df.sort_values(['date', 'category'])
df.to_parquet('sorted.parquet')
# 3. Use column selection
df = pd.read_parquet('data.parquet', columns=['needed', 'columns'])
# 4. Use filters for row pruning
df = pd.read_parquet('data/', filters=[('status', '==', 'active')])
# 5. Parallel reads
import pyarrow.dataset as ds
dataset = ds.dataset('data/', format='parquet')
table = dataset.to_table(use_threads=True)
def csv_to_parquet(
csv_path: str,
parquet_path: str,
chunk_size: int = 100_000
) -> None:
"""Convert large CSV to Parquet efficiently."""
writer = None
for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
table = pa.Table.from_pandas(chunk)
if writer is None:
writer = pq.ParquetWriter(parquet_path, table.schema)
writer.write_table(table)
writer.close()
def json_to_parquet(json_path: str, parquet_path: str) -> None:
"""Convert JSON lines to Parquet."""
df = pd.read_json(json_path, lines=True)
df.to_parquet(parquet_path, compression='snappy')