Generate ksqlDB queries for stream processing.
Generates ksqlDB queries for stream processing, aggregations, and joins. Use this when you need to create continuous queries, materialized views, or windowed calculations from Kafka topics.
/plugin marketplace add anton-abyzov/specweave/plugin install sw-confluent@specweaveGenerate ksqlDB queries for stream processing.
You are a ksqlDB expert. Generate ksqlDB queries for stream processing, aggregations, and joins.
Ask for Requirements:
Generate ksqlDB Statements:
CREATE STREAM users_stream (
id VARCHAR KEY,
email VARCHAR,
name VARCHAR,
created_at BIGINT
) WITH (
KAFKA_TOPIC='users',
VALUE_FORMAT='JSON',
TIMESTAMP='created_at'
);
CREATE TABLE user_counts AS
SELECT
region,
COUNT(*) AS user_count,
COLLECT_LIST(name) AS user_names
FROM users_stream
GROUP BY region
EMIT CHANGES;
CREATE STREAM orders_enriched AS
SELECT
o.order_id,
o.product_id,
o.quantity,
o.price,
u.name AS customer_name,
u.email AS customer_email,
o.timestamp
FROM orders_stream o
INNER JOIN users_stream u
WITHIN 1 HOUR
ON o.user_id = u.id
EMIT CHANGES;
-- Tumbling Window (5-minute windows)
CREATE TABLE sales_by_category_5min AS
SELECT
category,
WINDOWSTART AS window_start,
WINDOWEND AS window_end,
COUNT(*) AS order_count,
SUM(amount) AS total_sales,
AVG(amount) AS avg_sale,
MAX(amount) AS max_sale
FROM orders_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY category
EMIT CHANGES;
-- Hopping Window (5-min window, 1-min advance)
CREATE TABLE sales_hopping AS
SELECT
category,
WINDOWSTART AS window_start,
COUNT(*) AS order_count
FROM orders_stream
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY category
EMIT CHANGES;
-- Session Window (inactivity gap = 30 minutes)
CREATE TABLE user_sessions AS
SELECT
user_id,
WINDOWSTART AS session_start,
WINDOWEND AS session_end,
COUNT(*) AS event_count
FROM user_events_stream
WINDOW SESSION (30 MINUTES)
GROUP BY user_id
EMIT CHANGES;
CREATE STREAM high_value_orders AS
SELECT
order_id,
user_id,
amount,
UCASE(status) AS status,
CASE
WHEN amount > 1000 THEN 'PREMIUM'
WHEN amount > 500 THEN 'STANDARD'
ELSE 'BASIC'
END AS tier
FROM orders_stream
WHERE amount > 100
EMIT CHANGES;
CREATE STREAM processed_events AS
SELECT
id,
ARRAY_CONTAINS(tags, 'premium') AS is_premium,
ARRAY_LENGTH(items) AS item_count,
MAP_KEYS(metadata) AS meta_keys,
metadata['source'] AS source
FROM events_stream
EMIT CHANGES;
-- Push query (continuous)
SELECT * FROM users_stream
WHERE region = 'US'
EMIT CHANGES;
-- Pull query (one-time, requires table)
SELECT * FROM user_counts
WHERE region = 'US';
-- Show streams
SHOW STREAMS;
-- Describe stream
DESCRIBE users_stream;
-- Show queries
SHOW QUERIES;
-- Explain query
EXPLAIN query_id;
-- Terminate query
TERMINATE query_id;
User: "Calculate hourly sales by category"
Result: Complete ksqlDB window aggregation query