From effect
Manages Effect concurrency with fibers for forking, joining, interruption, parallel Effect.all execution, and race conditions in Effect apps.
How this skill is triggered — by the user, by Claude, or both
Slash command
/effect:effect-concurrencyThis skill is limited to the following tools:
The summary Claude sees in its skill listing — used to decide when to auto-load this skill
Master concurrent execution in Effect using fibers. This skill covers forking,
Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.
Fibers are lightweight virtual threads that execute effects concurrently:
import { Effect, Fiber } from "effect"
// Every effect runs on a fiber
const effect = Effect.succeed(42)
// When run, this executes on a fiber
// Effects are descriptions - fibers are executions
// Effect: lazy, immutable description
// Fiber: running execution with state
Create independent concurrent fibers:
import { Effect, Fiber } from "effect"
const task = Effect.gen(function* () {
yield* Effect.sleep("1 second")
yield* Effect.log("Task completed")
return 42
})
const program = Effect.gen(function* () {
// Fork creates a new fiber
const fiber = yield* Effect.fork(task)
// fiber: RuntimeFiber<number, never>
yield* Effect.log("Main fiber continues")
// Join waits for fiber to complete
const result = yield* Fiber.join(fiber)
yield* Effect.log(`Result: ${result}`)
return result
})
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Join - wait for result
const result = yield* Fiber.join(fiber)
// Await - get Exit value (success/failure/interruption)
const exit = yield* Fiber.await(fiber)
// Interrupt - cancel execution
yield* Fiber.interrupt(fiber)
// Poll - check if complete (non-blocking)
const status = yield* Fiber.poll(fiber)
})
import { Effect } from "effect"
// Parallel execution (default)
const program = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
])
// All requests run concurrently
return results
})
// Sequential execution
const sequential = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
], { concurrency: 1 })
return results
})
// Limited concurrency
const limited = Effect.gen(function* () {
const results = yield* Effect.all(
Array.from({ length: 100 }, (_, i) => fetchUser(`${i}`)),
{ concurrency: 10 } // Max 10 concurrent
)
return results
})
import { Effect } from "effect"
// Batching for efficiency
const batchFetch = Effect.gen(function* () {
const userIds = Array.from({ length: 1000 }, (_, i) => `${i}`)
const results = yield* Effect.all(
userIds.map(id => fetchUser(id)),
{
concurrency: 50, // 50 concurrent requests
batching: true // Enable batching optimization
}
)
return results
})
import { Effect } from "effect"
const processUsers = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => Effect.gen(function* () {
const user = yield* fetchUser(id)
const processed = yield* processUser(user)
return processed
}),
{ concurrency: "unbounded" } // No limit
)
// With concurrency limit
const processUsersLimited = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => processUser(id),
{ concurrency: 10 }
)
import { Effect } from "effect"
const fetchWithFallback = (id: string) =>
Effect.race(
fetchFromPrimaryDb(id),
fetchFromSecondaryDb(id)
)
// Returns whichever completes first
// Racing multiple effects
const fastestSource = Effect.race(
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
)
import { Effect } from "effect"
const sources = [
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
]
// First to succeed wins
const fastest = Effect.raceAll(sources)
import { Effect } from "effect"
const withTimeout = <A, E, R>(
effect: Effect.Effect<A, E, R>,
duration: Duration.Duration
) =>
Effect.race(
effect,
Effect.sleep(duration).pipe(
Effect.andThen(Effect.fail({ _tag: "Timeout" }))
)
)
const program = Effect.gen(function* () {
const result = yield* withTimeout(
slowOperation(),
Duration.seconds(5)
)
return result
})
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Cancel after 1 second
yield* Effect.sleep("1 second")
yield* Fiber.interrupt(fiber)
yield* Effect.log("Task cancelled")
})
// Automatic interruption on parent exit
const autoInterrupt = Effect.gen(function* () {
const fiber = yield* Effect.fork(infiniteLoop)
// fiber will be interrupted when this effect completes
})
import { Effect } from "effect"
const criticalSection = Effect.gen(function* () {
// This region cannot be interrupted
yield* Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction()
yield* updateDatabase()
yield* commitTransaction()
})
)
})
// Interruptible regions within uninterruptible
const mixed = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalOperation1()
// Allow interruption here
yield* Effect.interruptible(
nonCriticalOperation()
)
yield* criticalOperation2()
})
)
import { Effect } from "effect"
const program = Effect.gen(function* () {
// Regular fork - interrupted when parent exits
const regularFiber = yield* Effect.fork(task)
// Daemon fork - survives parent exit
const daemonFiber = yield* Effect.forkDaemon(backgroundTask)
// Parent exits, regularFiber interrupted, daemonFiber continues
})
// Background worker example
const startBackgroundWorker = Effect.gen(function* () {
yield* Effect.forkDaemon(
Effect.gen(function* () {
while (true) {
yield* processQueue()
yield* Effect.sleep("1 second")
}
})
)
})
import { Effect, Scope } from "effect"
const program = Effect.gen(function* () {
yield* Effect.scoped(
Effect.gen(function* () {
// Fibers are tied to scope
const fiber1 = yield* Effect.forkScoped(task1)
const fiber2 = yield* Effect.forkScoped(task2)
// Do work
yield* doWork()
// Scope exit automatically interrupts fibers
})
)
// fiber1 and fiber2 are interrupted here
})
import { Effect } from "effect"
const managedConcurrency = Effect.gen(function* () {
const scope = yield* Scope.make()
// Fork in specific scope
const fiber = yield* Effect.forkIn(task, scope)
// Work continues
yield* doWork()
// Close scope, interrupt fiber
yield* Scope.close(scope, Exit.succeed(undefined))
})
import { Effect, Queue } from "effect"
interface Task {
id: string
data: unknown
}
const createWorkerPool = (workers: number) =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<Task>(100)
// Start workers
const workerFibers = yield* Effect.all(
Array.from({ length: workers }, () =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(queue)
yield* processTask(task)
})
)
)
)
)
return {
submit: (task: Task) => Queue.offer(queue, task),
shutdown: () =>
Effect.all(
workerFibers.map(fiber => Fiber.interrupt(fiber))
)
}
})
import { Effect, Chunk } from "effect"
const parallelMapReduce = <A, B, E, R>(
items: A[],
map: (item: A) => Effect.Effect<B, E, R>,
reduce: (acc: B, item: B) => B,
initial: B,
concurrency: number
) =>
Effect.gen(function* () {
const mapped = yield* Effect.forEach(
items,
map,
{ concurrency }
)
return mapped.reduce(reduce, initial)
})
import { Effect, Request, RequestResolver } from "effect"
interface GetUser extends Request.Request<User, UserNotFound> {
readonly _tag: "GetUser"
readonly id: string
}
const GetUserResolver = RequestResolver.makeBatched(
(requests: GetUser[]) =>
Effect.gen(function* () {
const ids = requests.map(r => r.id)
const users = yield* fetchUsersBatch(ids)
// Resolve all requests
return Effect.forEach(requests, (request) => {
const user = users.find(u => u.id === request.id)
return user
? Request.complete(request, user)
: Request.fail(request, { _tag: "UserNotFound", id: request.id })
})
})
)
// Multiple concurrent requests for same ID deduplicated
const program = Effect.gen(function* () {
const results = yield* Effect.all([
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver)
])
// Only one actual fetch for ID "1"
})
Use Effect.all for Parallel Work: Don't fork manually when Effect.all suffices.
Limit Concurrency: Set appropriate concurrency limits to avoid resource exhaustion.
Handle Interruption: Ensure cleanup code runs in uninterruptible regions.
Use Scoped Forks: Tie fiber lifetime to scopes for automatic cleanup.
Avoid Infinite Loops: Use Effect.forever with sleep for background tasks.
Batch Requests: Use request resolvers to batch and deduplicate.
Timeout Long Operations: Add timeouts to prevent hanging.
Monitor Fiber Status: Use Fiber.await and Fiber.poll for status checks.
Use Daemon Sparingly: Only fork daemons when truly independent.
Test Concurrent Code: Write tests for race conditions and interruption.
Forgetting to Join: Forking without joining loses results.
No Concurrency Limits: Unbounded concurrency can exhaust resources.
Not Handling Interruption: Missing cleanup in interruptible regions.
Race Conditions: Sharing mutable state between fibers.
Deadlocks: Circular dependencies between fibers.
Ignoring Failures: Not checking fiber exit status.
Memory Leaks: Daemon fibers that never terminate.
Over-Forking: Creating too many fibers unnecessarily.
Missing Timeouts: Long-running operations without limits.
Wrong Execution Mode: Using sequential when parallel is intended.
Use effect-concurrency when you need to:
npx claudepluginhub thebushidocollective/han --plugin effectCovers core Effect patterns: Effect<A, E, R> type, succeed, fail, sync, promise, and Effect.gen for composing type-safe effects in TypeScript.
Provides expert guidance on Effect-TS patterns including services, layers, error handling, service composition, and refactoring code with 'effect' imports. Covers Effect + Next.js integration.
Provides expert guidance on Effect-TS for functional TypeScript including typed errors, dependency injection, Effect Layers, concurrency, pipelines, and production patterns.