Monitor whale transactions in real-time with market impact analysis and...
Monitors whale transactions across multiple blockchains with real-time market impact analysis and multi-channel alerting.
/plugin marketplace add jeremylongshore/claude-code-plugins-plus/plugin install whale-alert-monitor@claude-code-plugins-plusReal-time monitoring system for tracking large cryptocurrency transactions (whales) across multiple blockchains. Detects significant wallet movements, analyzes market impact, and provides automated alerts through multiple channels (Slack, Discord, Telegram, email).
Supports: Ethereum, Bitcoin, Binance Smart Chain, Polygon, Arbitrum, Optimism, Avalanche, Solana
Use /monitor-whales when you need to:
DON'T use this command for:
/analyze-chain instead/track-wallet instead/generate-signal instead/calculate-tax insteadWhy Multiple Blockchain Support? Whales operate across multiple chains. Cross-chain tracking provides complete picture of institutional movements and arbitrage activities.
Why Real-Time vs Batch Processing?
Why Threshold-Based Filtering?
Why Multi-Channel Alerting?
Why Market Impact Scoring?
Before running this command, ensure you have:
Blockchain Node Access (choose one per chain):
Price Data API:
Alert Delivery Services (at least one):
Database (for historical tracking):
Infrastructure:
Create config/chains.json:
{
"ethereum": {
"rpc_url": "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
"websocket_url": "wss://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
"whale_threshold_usd": 1000000,
"native_symbol": "ETH",
"block_time": 12
},
"bsc": {
"rpc_url": "https://bsc-dataseed.binance.org/",
"websocket_url": "wss://bsc-ws-node.nariox.org:443",
"whale_threshold_usd": 500000,
"native_symbol": "BNB",
"block_time": 3
},
"solana": {
"rpc_url": "https://solana-mainnet.g.alchemy.com/v2/YOUR_KEY",
"websocket_url": "wss://solana-mainnet.g.alchemy.com/v2/YOUR_KEY",
"whale_threshold_usd": 250000,
"native_symbol": "SOL",
"block_time": 0.4
}
}
Create config/alerts.json:
{
"channels": {
"slack": {
"enabled": true,
"webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
"channel": "#whale-alerts",
"min_severity": 50
},
"discord": {
"enabled": true,
"webhook_url": "https://discord.com/api/webhooks/YOUR_ID/YOUR_TOKEN",
"min_severity": 60
},
"telegram": {
"enabled": true,
"bot_token": "YOUR_BOT_TOKEN",
"chat_id": "YOUR_CHAT_ID",
"min_severity": 70
},
"email": {
"enabled": false,
"sendgrid_api_key": "YOUR_KEY",
"from_email": "alerts@yourdomain.com",
"to_emails": ["trader@yourdomain.com"],
"min_severity": 80
}
},
"rate_limiting": {
"max_alerts_per_hour": 50,
"cooldown_same_wallet": 300
}
}
-- PostgreSQL schema for whale tracking
CREATE TABLE whale_transactions (
id SERIAL PRIMARY KEY,
tx_hash VARCHAR(66) UNIQUE NOT NULL,
chain VARCHAR(20) NOT NULL,
block_number BIGINT NOT NULL,
timestamp TIMESTAMP NOT NULL,
from_address VARCHAR(42) NOT NULL,
to_address VARCHAR(42) NOT NULL,
value_native NUMERIC(36, 18) NOT NULL,
value_usd NUMERIC(18, 2) NOT NULL,
token_symbol VARCHAR(20),
token_address VARCHAR(42),
transaction_type VARCHAR(20),
severity_score INTEGER,
market_impact_score INTEGER,
is_exchange_related BOOLEAN DEFAULT FALSE,
exchange_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_timestamp (timestamp DESC),
INDEX idx_chain (chain),
INDEX idx_from_address (from_address),
INDEX idx_to_address (to_address),
INDEX idx_severity (severity_score DESC)
);
CREATE TABLE whale_wallets (
id SERIAL PRIMARY KEY,
address VARCHAR(42) UNIQUE NOT NULL,
chain VARCHAR(20) NOT NULL,
label VARCHAR(100),
wallet_type VARCHAR(50),
first_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_transaction TIMESTAMP,
total_transactions INTEGER DEFAULT 0,
total_volume_usd NUMERIC(18, 2) DEFAULT 0,
is_monitored BOOLEAN DEFAULT TRUE
);
CREATE TABLE exchange_wallets (
id SERIAL PRIMARY KEY,
address VARCHAR(42) NOT NULL,
chain VARCHAR(20) NOT NULL,
exchange_name VARCHAR(50) NOT NULL,
wallet_type VARCHAR(20),
added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(address, chain)
);
-- TimescaleDB hypertable for time-series optimization
SELECT create_hypertable('whale_transactions', 'timestamp');
Execute the monitoring script:
# Start real-time monitoring (all chains)
python3 whale_monitor.py --chains ethereum,bsc,solana \
--threshold 1000000 \
--min-severity 50 \
--alerts slack,discord,telegram
# Monitor specific wallets
python3 whale_monitor.py --watch-addresses wallets.txt \
--threshold 100000
# Export alerts to webhook
python3 whale_monitor.py --webhook-url https://your-api.com/webhook \
--format json
Create Grafana dashboard with Prometheus metrics:
# prometheus.yml
scrape_configs:
- job_name: 'whale_monitor'
static_configs:
- targets: ['localhost:9090']
metrics_path: '/metrics'
scrape_interval: 10s
The command generates 5 output files:
whale_alerts_YYYYMMDD_HHMMSS.jsonReal-time transaction alerts:
{
"alert_id": "alert_1634567890_eth_0xabc123",
"timestamp": "2025-10-11T14:23:45Z",
"chain": "ethereum",
"transaction": {
"hash": "0xabc123...",
"block_number": 18234567,
"from_address": "0x1234...5678",
"from_label": "Unknown Whale #42",
"to_address": "0xabcd...ef00",
"to_label": "Binance Hot Wallet 8",
"value_native": "5000.0",
"value_usd": 9250000.00,
"token": {
"symbol": "ETH",
"address": null,
"decimals": 18
}
},
"classification": {
"transaction_type": "exchange_deposit",
"is_exchange_related": true,
"exchange_name": "Binance",
"direction": "to_exchange"
},
"analysis": {
"severity_score": 85,
"market_impact_score": 72,
"impact_category": "HIGH",
"reasoning": "Large ETH deposit to Binance suggests potential sell pressure. Volume represents 0.003% of daily ETH trading volume.",
"historical_context": "This whale has deposited to exchanges 3 times in past 30 days, all followed by 2-5% price drops within 24h."
},
"market_context": {
"current_price_usd": 1850.00,
"24h_volume_usd": 15000000000,
"transaction_vs_volume_pct": 0.062
}
}
whale_summary_YYYYMMDD.csvDaily summary statistics:
date,chain,total_transactions,total_volume_usd,avg_transaction_usd,max_transaction_usd,unique_wallets,exchange_deposits_usd,exchange_withdrawals_usd,net_exchange_flow_usd
2025-10-11,ethereum,127,456789000.00,3598315.75,25000000.00,89,234500000.00,187600000.00,46900000.00
2025-10-11,bsc,213,89234000.00,419029.58,8500000.00,156,45600000.00,38900000.00,6700000.00
2025-10-11,solana,89,34567000.00,388382.02,5200000.00,67,18900000.00,14200000.00,4700000.00
exchange_flows_YYYYMMDD.jsonNet exchange flows (critical market indicator):
{
"date": "2025-10-11",
"chains": {
"ethereum": {
"exchanges": {
"binance": {
"deposits": {"count": 34, "volume_usd": 89234000.00},
"withdrawals": {"count": 28, "volume_usd": 67800000.00},
"net_flow_usd": 21434000.00,
"sentiment": "bearish"
},
"coinbase": {
"deposits": {"count": 23, "volume_usd": 45600000.00},
"withdrawals": {"count": 31, "volume_usd": 56700000.00},
"net_flow_usd": -11100000.00,
"sentiment": "bullish"
}
},
"total_net_flow_usd": 46900000.00,
"sentiment": "bearish"
}
}
}
whale_wallets_discovered.jsonNewly identified whale wallets:
{
"discovered_at": "2025-10-11T14:23:45Z",
"new_whales": [
{
"address": "0x1234...5678",
"chain": "ethereum",
"first_seen_tx": "0xabc123...",
"initial_balance_usd": 12500000.00,
"transaction_count_24h": 5,
"classification": "private_whale",
"confidence": 0.87
}
]
}
metrics_dashboard.htmlInteractive HTML dashboard with charts and real-time updates.
#!/usr/bin/env python3
"""
Production-grade whale transaction monitor with real-time alerting.
Supports Ethereum, BSC, Polygon, Solana with multi-chain coordination.
"""
import asyncio
import json
import logging
import time
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, asdict
import aiohttp
import psycopg2
from psycopg2.extras import execute_batch
from web3 import Web3
from web3.providers.websocket import WebsocketProvider
import requests
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class WhaleTransaction:
"""Represents a whale transaction with all metadata."""
tx_hash: str
chain: str
block_number: int
timestamp: datetime
from_address: str
to_address: str
value_native: Decimal
value_usd: Decimal
token_symbol: Optional[str] = None
token_address: Optional[str] = None
transaction_type: Optional[str] = None
severity_score: int = 0
market_impact_score: int = 0
is_exchange_related: bool = False
exchange_name: Optional[str] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for JSON serialization."""
d = asdict(self)
d['timestamp'] = self.timestamp.isoformat()
d['value_native'] = str(self.value_native)
d['value_usd'] = float(self.value_usd)
return d
class PriceOracle:
"""Fetch real-time cryptocurrency prices."""
def __init__(self, api_key: str):
self.api_key = api_key
self.cache: Dict[str, Dict] = {}
self.cache_ttl = 30 # seconds
async def get_price_usd(self, symbol: str, chain: str) -> float:
"""Get current USD price with caching."""
cache_key = f"{chain}:{symbol}"
if cache_key in self.cache:
cached = self.cache[cache_key]
if time.time() - cached['timestamp'] < self.cache_ttl:
return cached['price']
# Fetch from CoinGecko API
async with aiohttp.ClientSession() as session:
url = f"https://api.coingecko.com/api/v3/simple/price"
params = {
'ids': self._get_coingecko_id(symbol, chain),
'vs_currencies': 'usd'
}
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
coin_id = self._get_coingecko_id(symbol, chain)
price = data[coin_id]['usd']
self.cache[cache_key] = {
'price': price,
'timestamp': time.time()
}
return price
else:
logger.error(f"Failed to fetch price for {symbol}: {response.status}")
return 0.0
def _get_coingecko_id(self, symbol: str, chain: str) -> str:
"""Map symbol and chain to CoinGecko ID."""
mapping = {
'ethereum:ETH': 'ethereum',
'bsc:BNB': 'binancecoin',
'polygon:MATIC': 'matic-network',
'solana:SOL': 'solana',
}
return mapping.get(f"{chain}:{symbol}", symbol.lower())
class ExchangeWalletDatabase:
"""Known exchange wallet addresses database."""
def __init__(self, db_config: Dict):
self.conn = psycopg2.connect(**db_config)
self.exchange_wallets: Dict[str, Dict] = {}
self._load_exchange_wallets()
def _load_exchange_wallets(self):
"""Load known exchange wallets from database."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT address, chain, exchange_name, wallet_type
FROM exchange_wallets
WHERE is_active = TRUE
""")
for row in cur.fetchall():
address, chain, exchange_name, wallet_type = row
key = f"{chain}:{address.lower()}"
self.exchange_wallets[key] = {
'exchange_name': exchange_name,
'wallet_type': wallet_type
}
logger.info(f"Loaded {len(self.exchange_wallets)} exchange wallets")
def identify_exchange(self, address: str, chain: str) -> Optional[Dict]:
"""Check if address belongs to a known exchange."""
key = f"{chain}:{address.lower()}"
return self.exchange_wallets.get(key)
class MarketImpactAnalyzer:
"""Analyze potential market impact of whale transactions."""
def __init__(self, price_oracle: PriceOracle):
self.price_oracle = price_oracle
self.historical_data: Dict[str, List] = {}
def calculate_severity_score(self, transaction: WhaleTransaction,
market_data: Dict) -> int:
"""
Calculate severity score (0-100) based on multiple factors.
Factors:
- Transaction size vs 24h volume
- Exchange direction (deposit = bearish, withdrawal = bullish)
- Historical whale behavior patterns
- Current market volatility
- Time of day (US trading hours = higher impact)
"""
score = 0
# Factor 1: Size vs volume (max 40 points)
daily_volume = market_data.get('volume_24h_usd', 0)
if daily_volume > 0:
volume_pct = (float(transaction.value_usd) / daily_volume) * 100
if volume_pct > 1.0:
score += 40
elif volume_pct > 0.5:
score += 30
elif volume_pct > 0.1:
score += 20
else:
score += int(volume_pct * 100)
# Factor 2: Exchange direction (max 30 points)
if transaction.is_exchange_related:
if transaction.transaction_type == 'exchange_deposit':
score += 30 # Bearish signal
elif transaction.transaction_type == 'exchange_withdrawal':
score += 15 # Bullish signal (less immediate impact)
# Factor 3: Absolute transaction size (max 20 points)
if transaction.value_usd > 50000000: # $50M+
score += 20
elif transaction.value_usd > 10000000: # $10M+
score += 15
elif transaction.value_usd > 5000000: # $5M+
score += 10
else:
score += 5
# Factor 4: Market conditions (max 10 points)
current_hour = datetime.utcnow().hour
if 13 <= current_hour <= 21: # US trading hours
score += 10
else:
score += 5
return min(score, 100)
def calculate_market_impact_score(self, transaction: WhaleTransaction,
market_data: Dict) -> int:
"""
Estimate actual market impact (0-100).
Takes into account:
- Liquidity depth
- Order book analysis
- Recent price volatility
- Historical correlation
"""
impact = 0
# Simplified model - production would use order book depth
daily_volume = market_data.get('volume_24h_usd', 0)
if daily_volume > 0:
volume_ratio = float(transaction.value_usd) / daily_volume
# Exponential impact model
if volume_ratio > 0.01: # 1% of daily volume
impact = int(min(volume_ratio * 5000, 100))
else:
impact = int(volume_ratio * 1000)
return min(impact, 100)
class AlertManager:
"""Handle multi-channel alerting."""
def __init__(self, config: Dict):
self.config = config
self.alert_count: Dict[str, int] = {}
self.cooldown_tracker: Dict[str, float] = {}
async def send_alert(self, transaction: WhaleTransaction,
analysis: Dict) -> None:
"""Send alert through configured channels."""
severity = analysis['severity_score']
# Rate limiting check
if not self._check_rate_limit(transaction):
logger.debug(f"Alert rate limited for {transaction.tx_hash}")
return
# Send to each channel based on min_severity
tasks = []
for channel, channel_config in self.config['channels'].items():
if not channel_config['enabled']:
continue
if severity >= channel_config['min_severity']:
if channel == 'slack':
tasks.append(self._send_slack(transaction, analysis, channel_config))
elif channel == 'discord':
tasks.append(self._send_discord(transaction, analysis, channel_config))
elif channel == 'telegram':
tasks.append(self._send_telegram(transaction, analysis, channel_config))
elif channel == 'email':
tasks.append(self._send_email(transaction, analysis, channel_config))
await asyncio.gather(*tasks, return_exceptions=True)
def _check_rate_limit(self, transaction: WhaleTransaction) -> bool:
"""Check if alert should be rate limited."""
hour_key = datetime.utcnow().strftime('%Y%m%d%H')
# Global rate limit
if self.alert_count.get(hour_key, 0) >= self.config['rate_limiting']['max_alerts_per_hour']:
return False
# Per-wallet cooldown
wallet_key = f"{transaction.chain}:{transaction.from_address}"
last_alert = self.cooldown_tracker.get(wallet_key, 0)
if time.time() - last_alert < self.config['rate_limiting']['cooldown_same_wallet']:
return False
self.alert_count[hour_key] = self.alert_count.get(hour_key, 0) + 1
self.cooldown_tracker[wallet_key] = time.time()
return True
async def _send_slack(self, transaction: WhaleTransaction,
analysis: Dict, config: Dict) -> None:
"""Send Slack webhook notification."""
webhook_url = config['webhook_url']
# Format message
color = self._get_severity_color(analysis['severity_score'])
direction_emoji = "🔴" if transaction.transaction_type == 'exchange_deposit' else "🟢"
payload = {
"channel": config['channel'],
"attachments": [{
"color": color,
"title": f"{direction_emoji} Whale Alert: ${transaction.value_usd:,.0f} {transaction.token_symbol}",
"title_link": self._get_explorer_url(transaction),
"fields": [
{
"title": "Chain",
"value": transaction.chain.upper(),
"short": True
},
{
"title": "Severity",
"value": f"{analysis['severity_score']}/100",
"short": True
},
{
"title": "From",
"value": f"`{transaction.from_address[:10]}...{transaction.from_address[-8:]}`",
"short": False
},
{
"title": "To",
"value": f"{analysis.get('to_label', 'Unknown')} (`{transaction.to_address[:10]}...`)",
"short": False
},
{
"title": "Analysis",
"value": analysis.get('reasoning', 'N/A'),
"short": False
}
],
"footer": "Whale Monitor",
"ts": int(transaction.timestamp.timestamp())
}]
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
if response.status != 200:
logger.error(f"Slack alert failed: {response.status}")
async def _send_discord(self, transaction: WhaleTransaction,
analysis: Dict, config: Dict) -> None:
"""Send Discord webhook notification."""
webhook_url = config['webhook_url']
color = self._get_severity_color_int(analysis['severity_score'])
payload = {
"embeds": [{
"title": f"🐋 Whale Alert: ${transaction.value_usd:,.0f} {transaction.token_symbol}",
"url": self._get_explorer_url(transaction),
"color": color,
"fields": [
{"name": "Chain", "value": transaction.chain.upper(), "inline": True},
{"name": "Severity", "value": f"{analysis['severity_score']}/100", "inline": True},
{"name": "Type", "value": transaction.transaction_type or "Unknown", "inline": True},
{"name": "From", "value": f"`{transaction.from_address}`", "inline": False},
{"name": "To", "value": f"`{transaction.to_address}`", "inline": False},
{"name": "Analysis", "value": analysis.get('reasoning', 'N/A'), "inline": False}
],
"timestamp": transaction.timestamp.isoformat()
}]
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as response:
if response.status != 204:
logger.error(f"Discord alert failed: {response.status}")
async def _send_telegram(self, transaction: WhaleTransaction,
analysis: Dict, config: Dict) -> None:
"""Send Telegram bot message."""
bot_token = config['bot_token']
chat_id = config['chat_id']
message = f"""
🐋 *Whale Alert*
Amount: ${transaction.value_usd:,.0f} {transaction.token_symbol}
Chain: {transaction.chain.upper()}
Severity: {analysis['severity_score']}/100
From: `{transaction.from_address}`
To: `{transaction.to_address}`
Type: {transaction.transaction_type or 'Unknown'}
{analysis.get('reasoning', '')}
[View Transaction]({self._get_explorer_url(transaction)})
"""
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message,
"parse_mode": "Markdown"
}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
if response.status != 200:
logger.error(f"Telegram alert failed: {response.status}")
async def _send_email(self, transaction: WhaleTransaction,
analysis: Dict, config: Dict) -> None:
"""Send email via SendGrid."""
# Implementation similar to other channels
pass
def _get_severity_color(self, severity: int) -> str:
"""Get Slack color based on severity."""
if severity >= 80:
return "danger"
elif severity >= 60:
return "warning"
else:
return "good"
def _get_severity_color_int(self, severity: int) -> int:
"""Get Discord color integer based on severity."""
if severity >= 80:
return 0xFF0000 # Red
elif severity >= 60:
return 0xFFA500 # Orange
else:
return 0x00FF00 # Green
def _get_explorer_url(self, transaction: WhaleTransaction) -> str:
"""Get block explorer URL for transaction."""
explorers = {
'ethereum': f"https://etherscan.io/tx/{transaction.tx_hash}",
'bsc': f"https://bscscan.com/tx/{transaction.tx_hash}",
'polygon': f"https://polygonscan.com/tx/{transaction.tx_hash}",
'solana': f"https://solscan.io/tx/{transaction.tx_hash}"
}
return explorers.get(transaction.chain, "#")
class WhaleMonitor:
"""Main whale monitoring orchestrator."""
def __init__(self, config_path: str):
with open(config_path) as f:
self.config = json.load(f)
self.price_oracle = PriceOracle(self.config['api_keys']['coingecko'])
self.exchange_db = ExchangeWalletDatabase(self.config['database'])
self.impact_analyzer = MarketImpactAnalyzer(self.price_oracle)
self.alert_manager = AlertManager(self.config['alerts'])
self.web3_connections: Dict[str, Web3] = {}
self.monitored_addresses: Set[str] = set()
async def start_monitoring(self, chains: List[str]) -> None:
"""Start monitoring multiple chains concurrently."""
logger.info(f"Starting whale monitor for chains: {chains}")
tasks = []
for chain in chains:
tasks.append(self._monitor_chain(chain))
await asyncio.gather(*tasks)
async def _monitor_chain(self, chain: str) -> None:
"""Monitor single blockchain for whale transactions."""
chain_config = self.config['chains'][chain]
# Connect to WebSocket
w3 = Web3(WebsocketProvider(chain_config['websocket_url']))
self.web3_connections[chain] = w3
logger.info(f"Connected to {chain}, monitoring blocks...")
# Subscribe to new blocks
block_filter = w3.eth.filter('latest')
while True:
try:
for block_hash in block_filter.get_new_entries():
block = w3.eth.get_block(block_hash, full_transactions=True)
await self._process_block(chain, block)
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error monitoring {chain}: {e}")
await asyncio.sleep(5)
async def _process_block(self, chain: str, block) -> None:
"""Process all transactions in a block."""
threshold_usd = self.config['chains'][chain]['whale_threshold_usd']
native_symbol = self.config['chains'][chain]['native_symbol']
for tx in block.transactions:
try:
# Convert value to native token amount
value_native = Decimal(tx['value']) / Decimal(10**18)
# Skip if below threshold (preliminary check)
if value_native < 10: # Skip very small transactions
continue
# Get USD value
price_usd = await self.price_oracle.get_price_usd(native_symbol, chain)
value_usd = value_native * Decimal(price_usd)
# Check if whale transaction
if value_usd >= threshold_usd:
await self._handle_whale_transaction(chain, tx, value_native, value_usd, native_symbol)
except Exception as e:
logger.error(f"Error processing tx {tx.get('hash', 'unknown')}: {e}")
async def _handle_whale_transaction(self, chain: str, tx: Dict,
value_native: Decimal, value_usd: Decimal,
token_symbol: str) -> None:
"""Process and alert on whale transaction."""
transaction = WhaleTransaction(
tx_hash=tx['hash'].hex(),
chain=chain,
block_number=tx['blockNumber'],
timestamp=datetime.utcnow(),
from_address=tx['from'],
to_address=tx['to'],
value_native=value_native,
value_usd=value_usd,
token_symbol=token_symbol
)
# Identify if exchange-related
from_exchange = self.exchange_db.identify_exchange(tx['from'], chain)
to_exchange = self.exchange_db.identify_exchange(tx['to'], chain)
if to_exchange:
transaction.is_exchange_related = True
transaction.exchange_name = to_exchange['exchange_name']
transaction.transaction_type = 'exchange_deposit'
elif from_exchange:
transaction.is_exchange_related = True
transaction.exchange_name = from_exchange['exchange_name']
transaction.transaction_type = 'exchange_withdrawal'
# Get market data
market_data = await self._fetch_market_data(token_symbol, chain)
# Calculate impact scores
transaction.severity_score = self.impact_analyzer.calculate_severity_score(
transaction, market_data
)
transaction.market_impact_score = self.impact_analyzer.calculate_market_impact_score(
transaction, market_data
)
# Store in database
self._store_transaction(transaction)
# Send alerts
analysis = {
'severity_score': transaction.severity_score,
'market_impact_score': transaction.market_impact_score,
'reasoning': self._generate_reasoning(transaction, market_data),
'to_label': to_exchange['exchange_name'] if to_exchange else 'Unknown'
}
await self.alert_manager.send_alert(transaction, analysis)
logger.info(f"Whale alert: {transaction.value_usd:,.0f} USD {token_symbol} on {chain} (severity: {transaction.severity_score})")
async def _fetch_market_data(self, symbol: str, chain: str) -> Dict:
"""Fetch current market data for context."""
# Simplified - production would use real-time market data API
return {
'volume_24h_usd': 15000000000, # Placeholder
'price_usd': await self.price_oracle.get_price_usd(symbol, chain)
}
def _generate_reasoning(self, transaction: WhaleTransaction,
market_data: Dict) -> str:
"""Generate human-readable reasoning for alert."""
reasoning_parts = []
if transaction.is_exchange_related:
if transaction.transaction_type == 'exchange_deposit':
reasoning_parts.append(f"Large {transaction.token_symbol} deposit to {transaction.exchange_name} suggests potential sell pressure.")
else:
reasoning_parts.append(f"Large {transaction.token_symbol} withdrawal from {transaction.exchange_name} suggests accumulation.")
daily_volume = market_data.get('volume_24h_usd', 0)
if daily_volume > 0:
volume_pct = (float(transaction.value_usd) / daily_volume) * 100
reasoning_parts.append(f"Volume represents {volume_pct:.3f}% of daily {transaction.token_symbol} trading volume.")
return " ".join(reasoning_parts)
def _store_transaction(self, transaction: WhaleTransaction) -> None:
"""Store transaction in database."""
conn = self.exchange_db.conn
with conn.cursor() as cur:
cur.execute("""
INSERT INTO whale_transactions (
tx_hash, chain, block_number, timestamp, from_address, to_address,
value_native, value_usd, token_symbol, transaction_type,
severity_score, market_impact_score, is_exchange_related, exchange_name
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (tx_hash) DO NOTHING
""", (
transaction.tx_hash, transaction.chain, transaction.block_number,
transaction.timestamp, transaction.from_address, transaction.to_address,
float(transaction.value_native), float(transaction.value_usd),
transaction.token_symbol, transaction.transaction_type,
transaction.severity_score, transaction.market_impact_score,
transaction.is_exchange_related, transaction.exchange_name
))
conn.commit()
async def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description='Monitor whale cryptocurrency transactions')
parser.add_argument('--config', default='config/whale_monitor.json', help='Config file path')
parser.add_argument('--chains', default='ethereum,bsc', help='Comma-separated chain list')
parser.add_argument('--threshold', type=float, help='Override USD threshold')
args = parser.parse_args()
monitor = WhaleMonitor(args.config)
if args.threshold:
for chain_config in monitor.config['chains'].values():
chain_config['whale_threshold_usd'] = args.threshold
chains = args.chains.split(',')
await monitor.start_monitoring(chains)
if __name__ == '__main__':
asyncio.run(main())
| Error Type | Detection | Resolution | Prevention |
|---|---|---|---|
| RPC connection failure | Connection timeout (30s) | Retry with exponential backoff, failover to backup RPC | Use multiple RPC providers, health checks |
| WebSocket disconnection | No new blocks for 60s | Reconnect and sync from last block | Keep-alive pings, connection monitoring |
| Price API rate limiting | HTTP 429 response | Use cached price, reduce fetch frequency | Implement tiered caching (30s/5min/1h) |
| Database connection lost | psycopg2.OperationalError | Reconnect pool, buffer alerts to disk | Connection pooling, periodic health checks |
| Alert delivery failure | HTTP 4xx/5xx from webhook | Queue for retry (3 attempts), log failure | Implement retry queue, monitor delivery rates |
| Memory overflow | Transaction buffer >10K entries | Batch process and clear buffer | Process transactions in real-time, limit buffer size |
| Invalid transaction format | Missing 'value' or 'from' field | Skip transaction, log error | Validate transaction schema before processing |
| Stale block data | Block timestamp >5min old | Resync from latest block, alert devs | Monitor block freshness, automated resync |
# config/whale_monitor.yml
monitoring:
chains:
- ethereum
- bsc
- polygon
thresholds:
global_usd: 1000000
per_chain:
ethereum: 1000000
bsc: 500000
solana: 250000
filters:
min_severity_alert: 50
exclude_addresses:
- "0x0000...0000" # Burn address
only_exchange_related: false
performance:
batch_size: 100
max_concurrent_chains: 5
transaction_buffer_size: 5000
alerts:
rate_limiting:
max_per_hour: 50
max_per_wallet_per_day: 10
cooldown_seconds: 300
channels:
slack:
enabled: true
min_severity: 50
discord:
enabled: true
min_severity: 60
telegram:
enabled: true
min_severity: 70
email:
enabled: false
min_severity: 80
database:
retention_days: 90
partition_by: day
indexes:
- timestamp
- chain
- severity_score
timescaledb:
enabled: true
chunk_interval: '1 day'
compression:
enabled: true
after_days: 7
analytics:
export_interval_hours: 1
generate_reports: true
prometheus_port: 9090
Optimization Tips:
/analyze-chain - Historical on-chain analysis and whale wallet discovery/track-wallet - Monitor specific wallet addresses over time/scan-movers - Identify unusual market movements correlated with whale activity/analyze-flow - Options flow analysis (often precedes whale spot movements)/find-arbitrage - Cross-exchange arbitrage (whale profit taking)/analyze-sentiment - News sentiment (whale positioning vs market narrative)/analyze-pool - DEX liquidity analysis (whale impact on AMM pools)