ARIMA, SARIMA, Prophet, trend analysis, seasonality detection, anomaly detection, and forecasting methods. Use for time-based predictions, demand forecasting, or temporal pattern analysis.
Detect temporal patterns and forecast future values using ARIMA, SARIMA, Prophet, and LSTM models. Use when analyzing time-based data for demand forecasting, trend analysis, or anomaly detection.
/plugin marketplace add pluginagentmarketplace/custom-plugin-ai-data-scientist/plugin install ai-data-scientist-plugin@pluginagentmarketplace-ai-data-scientistThis skill inherits all available tools. When active, it can use any tool Claude has access to.
assets/config.yamlassets/schema.jsonreferences/GUIDE.mdreferences/PATTERNS.mdscripts/validate.pyAnalyze temporal data patterns and build accurate forecasting models.
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# Load time series data
df = pd.read_csv('sales.csv', parse_dates=['date'], index_col='date')
# Ensure proper datetime index
df.index = pd.DatetimeIndex(df.index, freq='D')
# Basic exploration
print(df.head())
print(df.describe())
# Visualize
df['value'].plot(figsize=(12, 4), title='Time Series')
plt.show()
from statsmodels.tsa.seasonal import seasonal_decompose
# Additive decomposition
decomposition = seasonal_decompose(df['value'], model='additive', period=7)
fig, axes = plt.subplots(4, 1, figsize=(12, 10))
decomposition.observed.plot(ax=axes[0], title='Original')
decomposition.trend.plot(ax=axes[1], title='Trend')
decomposition.seasonal.plot(ax=axes[2], title='Seasonality')
decomposition.resid.plot(ax=axes[3], title='Residual')
plt.tight_layout()
plt.show()
from statsmodels.tsa.stattools import adfuller, kpss
def check_stationarity(series):
"""Test for stationarity using ADF and KPSS tests"""
# ADF Test (H0: series has unit root, i.e., non-stationary)
adf_result = adfuller(series, autolag='AIC')
adf_pvalue = adf_result[1]
# KPSS Test (H0: series is stationary)
kpss_result = kpss(series, regression='c')
kpss_pvalue = kpss_result[1]
print("Stationarity Test Results:")
print(f"ADF Statistic: {adf_result[0]:.4f}")
print(f"ADF p-value: {adf_pvalue:.4f}")
print(f"KPSS Statistic: {kpss_result[0]:.4f}")
print(f"KPSS p-value: {kpss_pvalue:.4f}")
if adf_pvalue < 0.05 and kpss_pvalue > 0.05:
print("Conclusion: Series is STATIONARY")
return True
elif adf_pvalue > 0.05 and kpss_pvalue < 0.05:
print("Conclusion: Series is NON-STATIONARY")
return False
else:
print("Conclusion: Inconclusive - consider differencing")
return None
is_stationary = check_stationarity(df['value'])
def make_stationary(series, max_diff=2):
"""Apply differencing to make series stationary"""
diff_series = series.copy()
d = 0
while d < max_diff:
if check_stationarity(diff_series.dropna()):
break
diff_series = diff_series.diff()
d += 1
return diff_series.dropna(), d
stationary_series, d_order = make_stationary(df['value'])
print(f"Differencing order: {d_order}")
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
plot_acf(stationary_series, lags=40, ax=axes[0])
axes[0].set_title('Autocorrelation (ACF)')
plot_pacf(stationary_series, lags=40, ax=axes[1])
axes[1].set_title('Partial Autocorrelation (PACF)')
plt.tight_layout()
plt.show()
# Reading ACF/PACF:
# - PACF cuts off after lag p -> AR(p)
# - ACF cuts off after lag q -> MA(q)
# - Both decay -> ARMA(p, q)
from pmdarima import auto_arima
# Automatic order selection
auto_model = auto_arima(
df['value'],
start_p=0, max_p=5,
start_q=0, max_q=5,
d=None, # Auto-detect differencing
seasonal=True,
m=7, # Weekly seasonality
start_P=0, max_P=2,
start_Q=0, max_Q=2,
D=None,
trace=True,
error_action='ignore',
suppress_warnings=True,
stepwise=True,
information_criterion='aic'
)
print(auto_model.summary())
print(f"Best order: {auto_model.order}")
print(f"Seasonal order: {auto_model.seasonal_order}")
from statsmodels.tsa.statespace.sarimax import SARIMAX
# Train/test split
train = df['value'][:-30]
test = df['value'][-30:]
# Fit SARIMA model
model = SARIMAX(
train,
order=(1, 1, 1), # (p, d, q)
seasonal_order=(1, 1, 1, 7), # (P, D, Q, m)
enforce_stationarity=False,
enforce_invertibility=False
)
results = model.fit(disp=False)
print(results.summary())
# Diagnostics
results.plot_diagnostics(figsize=(12, 8))
plt.tight_layout()
plt.show()
# Forecast
forecast = results.get_forecast(steps=30)
forecast_mean = forecast.predicted_mean
forecast_ci = forecast.conf_int()
# Plot
plt.figure(figsize=(12, 5))
plt.plot(train.index, train, label='Training')
plt.plot(test.index, test, label='Actual')
plt.plot(test.index, forecast_mean, label='Forecast', color='red')
plt.fill_between(
test.index,
forecast_ci.iloc[:, 0],
forecast_ci.iloc[:, 1],
alpha=0.3
)
plt.legend()
plt.title('SARIMA Forecast')
plt.show()
from prophet import Prophet
# Prepare data (Prophet requires 'ds' and 'y' columns)
prophet_df = df.reset_index()
prophet_df.columns = ['ds', 'y']
# Initialize and fit
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='multiplicative',
changepoint_prior_scale=0.05,
seasonality_prior_scale=10
)
# Add custom seasonality
model.add_seasonality(name='monthly', period=30.5, fourier_order=5)
# Add holidays (optional)
model.add_country_holidays(country_name='US')
model.fit(prophet_df)
# Create future dataframe
future = model.make_future_dataframe(periods=30)
# Predict
forecast = model.predict(future)
# Visualize
fig1 = model.plot(forecast)
plt.title('Prophet Forecast')
plt.show()
# Components
fig2 = model.plot_components(forecast)
plt.show()
# Cross-validation
from prophet.diagnostics import cross_validation, performance_metrics
df_cv = cross_validation(
model,
initial='365 days',
period='30 days',
horizon='30 days'
)
df_metrics = performance_metrics(df_cv)
print(df_metrics[['horizon', 'mape', 'rmse', 'mae']])
from statsmodels.tsa.holtwinters import ExponentialSmoothing
# Simple Exponential Smoothing
from statsmodels.tsa.api import SimpleExpSmoothing
ses_model = SimpleExpSmoothing(train).fit(
smoothing_level=0.3,
optimized=True
)
# Holt-Winters (Triple Exponential Smoothing)
hw_model = ExponentialSmoothing(
train,
trend='add', # 'add', 'mul', or None
seasonal='add', # 'add', 'mul', or None
seasonal_periods=7
).fit(
smoothing_level=0.3,
smoothing_trend=0.1,
smoothing_seasonal=0.1,
optimized=True
)
# Forecast
hw_forecast = hw_model.forecast(steps=30)
# Compare
plt.figure(figsize=(12, 5))
plt.plot(train, label='Training')
plt.plot(test, label='Test')
plt.plot(test.index, hw_forecast, label='Holt-Winters', color='red')
plt.legend()
plt.show()
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
class LSTMForecaster(nn.Module):
"""LSTM for time series forecasting"""
def __init__(self, input_size=1, hidden_size=64, num_layers=2, output_size=1):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(
input_size,
hidden_size,
num_layers,
batch_first=True,
dropout=0.2
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
def create_sequences(data, seq_length):
"""Create sequences for LSTM"""
X, y = [], []
for i in range(len(data) - seq_length):
X.append(data[i:i+seq_length])
y.append(data[i+seq_length])
return np.array(X), np.array(y)
# Prepare data
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(df['value'].values.reshape(-1, 1))
seq_length = 30
X, y = create_sequences(scaled_data, seq_length)
# Convert to tensors
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)
# Train/test split
train_size = int(0.8 * len(X))
X_train, X_test = X_tensor[:train_size], X_tensor[train_size:]
y_train, y_test = y_tensor[:train_size], y_tensor[train_size:]
# Training
model = LSTMForecaster()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(100):
model.train()
optimizer.zero_grad()
output = model(X_train)
loss = criterion(output, y_train)
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f'Epoch {epoch}, Loss: {loss.item():.4f}')
class TimeSeriesTransformer(nn.Module):
"""Transformer for time series forecasting"""
def __init__(self, input_size=1, d_model=64, nhead=4,
num_layers=2, output_size=1, seq_length=30):
super().__init__()
self.embedding = nn.Linear(input_size, d_model)
self.pos_encoding = nn.Parameter(torch.randn(1, seq_length, d_model))
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=256,
dropout=0.1,
batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
self.fc = nn.Linear(d_model, output_size)
def forward(self, x):
x = self.embedding(x) + self.pos_encoding
x = self.transformer(x)
x = self.fc(x[:, -1, :])
return x
from sklearn.ensemble import IsolationForest
from scipy import stats
def detect_anomalies(series, method='zscore', threshold=3):
"""Detect anomalies using various methods"""
if method == 'zscore':
z_scores = np.abs(stats.zscore(series))
anomalies = z_scores > threshold
elif method == 'iqr':
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
anomalies = (series < Q1 - 1.5 * IQR) | (series > Q3 + 1.5 * IQR)
elif method == 'isolation_forest':
model = IsolationForest(contamination=0.05, random_state=42)
predictions = model.fit_predict(series.values.reshape(-1, 1))
anomalies = predictions == -1
elif method == 'rolling':
rolling_mean = series.rolling(window=7).mean()
rolling_std = series.rolling(window=7).std()
upper = rolling_mean + threshold * rolling_std
lower = rolling_mean - threshold * rolling_std
anomalies = (series > upper) | (series < lower)
return anomalies
# Detect and visualize
anomalies = detect_anomalies(df['value'], method='zscore')
plt.figure(figsize=(12, 5))
plt.plot(df.index, df['value'], label='Value')
plt.scatter(df.index[anomalies], df['value'][anomalies],
color='red', label='Anomalies')
plt.legend()
plt.title('Anomaly Detection')
plt.show()
def create_time_features(df):
"""Create time-based features"""
df = df.copy()
# Calendar features
df['hour'] = df.index.hour
df['dayofweek'] = df.index.dayofweek
df['dayofmonth'] = df.index.day
df['month'] = df.index.month
df['quarter'] = df.index.quarter
df['year'] = df.index.year
df['dayofyear'] = df.index.dayofyear
df['weekofyear'] = df.index.isocalendar().week
# Cyclical encoding (for periodic features)
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
df['dayofweek_sin'] = np.sin(2 * np.pi * df['dayofweek'] / 7)
df['dayofweek_cos'] = np.cos(2 * np.pi * df['dayofweek'] / 7)
# Boolean features
df['is_weekend'] = df['dayofweek'].isin([5, 6]).astype(int)
df['is_month_start'] = df.index.is_month_start.astype(int)
df['is_month_end'] = df.index.is_month_end.astype(int)
return df
def create_lag_features(df, target_col, lags):
"""Create lag features"""
df = df.copy()
for lag in lags:
df[f'{target_col}_lag_{lag}'] = df[target_col].shift(lag)
return df
def create_rolling_features(df, target_col, windows):
"""Create rolling window features"""
df = df.copy()
for window in windows:
df[f'{target_col}_rolling_mean_{window}'] = df[target_col].rolling(window).mean()
df[f'{target_col}_rolling_std_{window}'] = df[target_col].rolling(window).std()
df[f'{target_col}_rolling_min_{window}'] = df[target_col].rolling(window).min()
df[f'{target_col}_rolling_max_{window}'] = df[target_col].rolling(window).max()
return df
# Apply feature engineering
df_features = create_time_features(df)
df_features = create_lag_features(df_features, 'value', [1, 7, 14, 30])
df_features = create_rolling_features(df_features, 'value', [7, 14, 30])
from sklearn.metrics import mean_absolute_error, mean_squared_error
def evaluate_forecast(actual, predicted):
"""Calculate forecasting metrics"""
mae = mean_absolute_error(actual, predicted)
mse = mean_squared_error(actual, predicted)
rmse = np.sqrt(mse)
mape = np.mean(np.abs((actual - predicted) / actual)) * 100
# Symmetric MAPE (handles zeros better)
smape = np.mean(2 * np.abs(actual - predicted) /
(np.abs(actual) + np.abs(predicted))) * 100
print(f"MAE: {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAPE: {mape:.2f}%")
print(f"sMAPE: {smape:.2f}%")
return {'mae': mae, 'rmse': rmse, 'mape': mape, 'smape': smape}
metrics = evaluate_forecast(test, forecast_mean)
Issue: Non-stationary series
Solutions:
- Apply differencing (1st or 2nd order)
- Log transformation for variance stabilization
- Detrending (subtract moving average)
- Seasonal differencing for seasonal data
Issue: Overfitting on training data
Solutions:
- Use proper cross-validation (TimeSeriesSplit)
- Reduce model complexity
- Regularization
- More training data
Issue: Poor forecast accuracy
Solutions:
- Check for data quality issues
- Try different models
- Include external regressors
- Ensemble multiple models
- Feature engineering
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.
This skill should be used when the user asks to "create a slash command", "add a command", "write a custom command", "define command arguments", "use command frontmatter", "organize commands", "create command with file references", "interactive command", "use AskUserQuestion in command", or needs guidance on slash command structure, YAML frontmatter fields, dynamic arguments, bash execution in commands, user interaction patterns, or command development best practices for Claude Code.
This skill should be used when the user asks to "create a hook", "add a PreToolUse/PostToolUse/Stop hook", "validate tool use", "implement prompt-based hooks", "use ${CLAUDE_PLUGIN_ROOT}", "set up event-driven automation", "block dangerous commands", or mentions hook events (PreToolUse, PostToolUse, Stop, SubagentStop, SessionStart, SessionEnd, UserPromptSubmit, PreCompact, Notification). Provides comprehensive guidance for creating and implementing Claude Code plugin hooks with focus on advanced prompt-based hooks API.