Parallel and concurrent processing patterns in bash including GNU Parallel, xargs, job pools, and async patterns (2025)
/plugin marketplace add JosiahSiegel/claude-plugin-marketplace/plugin install bash-master@claude-plugin-marketplaceThis skill inherits all available tools. When active, it can use any tool Claude has access to.
MANDATORY: Always Use Backslashes on Windows for File Paths
When using Edit or Write tools on Windows, you MUST use backslashes (\) in file paths, NOT forward slashes (/).
Comprehensive guide to parallel and concurrent execution in bash, covering GNU Parallel, xargs parallelization, job control, worker pools, and modern async patterns for maximum performance.
# Debian/Ubuntu
sudo apt-get install parallel
# macOS
brew install parallel
# From source
wget https://ftp.gnu.org/gnu/parallel/parallel-latest.tar.bz2
tar -xjf parallel-latest.tar.bz2
cd parallel-*
./configure && make && sudo make install
#!/usr/bin/env bash
set -euo pipefail
# Process multiple files in parallel
parallel gzip ::: *.txt
# Equivalent to:
# for f in *.txt; do gzip "$f"; done
# But runs in parallel!
# Using find with parallel
find . -name "*.jpg" | parallel convert {} -resize 50% resized/{}
# Specify number of jobs
parallel -j 8 process_file ::: *.dat
# From stdin
cat urls.txt | parallel -j 10 wget -q
# Multiple inputs
parallel echo ::: A B C ::: 1 2 3
# Output: A 1, A 2, A 3, B 1, B 2, B 3, C 1, C 2, C 3
# Paired inputs with :::+
parallel echo ::: A B C :::+ 1 2 3
# Output: A 1, B 2, C 3
#!/usr/bin/env bash
set -euo pipefail
# Input from file
parallel -a input.txt process_line
# Multiple input files
parallel -a file1.txt -a file2.txt 'echo {1} {2}'
# Column-based input
cat data.tsv | parallel --colsep '\t' 'echo Name: {1}, Value: {2}'
# Named columns
cat data.csv | parallel --header : --colsep ',' 'echo {name}: {value}'
# Null-delimited for safety with special characters
find . -name "*.txt" -print0 | parallel -0 wc -l
# Line-based chunking
cat huge_file.txt | parallel --pipe -N1000 'wc -l'
#!/usr/bin/env bash
set -euo pipefail
# {} - Full input
parallel echo 'Processing: {}' ::: file1.txt file2.txt
# {.} - Remove extension
parallel echo '{.}' ::: file.txt file.csv
# Output: file, file
# {/} - Basename
parallel echo '{/}' ::: /path/to/file.txt
# Output: file.txt
# {//} - Directory path
parallel echo '{//}' ::: /path/to/file.txt
# Output: /path/to
# {/.} - Basename without extension
parallel echo '{/.}' ::: /path/to/file.txt
# Output: file
# {#} - Job number (1-based)
parallel echo 'Job {#}: {}' ::: A B C
# {%} - Slot number (recycled job slot)
parallel -j 2 'echo "Slot {%}: {}"' ::: A B C D E
# Combined
parallel 'convert {} -resize 50% {//}/thumb_{/.}.jpg' ::: *.png
#!/usr/bin/env bash
set -euo pipefail
# Show progress bar
parallel --bar process_item ::: {1..100}
# Progress with ETA
parallel --progress process_item ::: {1..100}
# Verbose output
parallel --verbose gzip ::: *.txt
# Log to file
parallel --joblog jobs.log gzip ::: *.txt
# Resume from where it left off (skip completed jobs)
parallel --joblog jobs.log --resume gzip ::: *.txt
# Results logging
parallel --results results_dir 'echo {1} + {2}' ::: 1 2 3 ::: 4 5 6
# Creates: results_dir/1/4/stdout, results_dir/1/4/stderr, etc.
#!/usr/bin/env bash
set -euo pipefail
# CPU-based parallelism (number of cores)
parallel -j "$(nproc)" process_item ::: {1..1000}
# Leave some cores free
parallel -j '-2' process_item ::: {1..1000} # nproc - 2
# Percentage of cores
parallel -j '50%' process_item ::: {1..1000}
# Load-based throttling
parallel --load 80% process_item ::: {1..1000}
# Memory-based throttling
parallel --memfree 2G process_item ::: {1..1000}
# Rate limiting (max jobs per second)
parallel -j 4 --delay 0.5 wget ::: url1 url2 url3 url4
# Timeout per job
parallel --timeout 60 long_process ::: {1..100}
# Retry failed jobs
parallel --retries 3 flaky_process ::: {1..100}
#!/usr/bin/env bash
set -euo pipefail
# Run on multiple servers
parallel --sshloginfile servers.txt process_item ::: {1..1000}
# servers.txt format:
# 4/server1.example.com (4 jobs on server1)
# 8/server2.example.com (8 jobs on server2)
# : (local machine)
# Transfer files before execution
parallel --sshloginfile servers.txt --transferfile {} process {} ::: *.dat
# Return results
parallel --sshloginfile servers.txt --return {.}.result process {} ::: *.dat
# Cleanup after transfer
parallel --sshloginfile servers.txt --transfer --return {.}.out --cleanup \
'process {} > {.}.out' ::: *.dat
# Environment variables
export MY_VAR="value"
parallel --env MY_VAR --sshloginfile servers.txt 'echo $MY_VAR' ::: A B C
#!/usr/bin/env bash
set -euo pipefail
# Pipe mode - distribute stdin across workers
cat huge_file.txt | parallel --pipe -N1000 'sort | uniq -c'
# Block size for pipe mode
cat data.bin | parallel --pipe --block 10M 'process_chunk'
# Keep order of output
parallel --keep-order 'sleep $((RANDOM % 3)); echo {}' ::: A B C D E
# Group output (don't mix output from different jobs)
parallel --group 'for i in 1 2 3; do echo "Job {}: line $i"; done' ::: A B C
# Tag output with job identifier
parallel --tag 'echo "output from {}"' ::: A B C
# Sequence output (output as they complete, but grouped)
parallel --ungroup 'echo "Starting {}"; sleep 1; echo "Done {}"' ::: A B C
#!/usr/bin/env bash
set -euo pipefail
# -P for parallel jobs
find . -name "*.txt" | xargs -P 4 -I {} gzip {}
# -n for items per command
echo {1..100} | xargs -n 10 -P 4 echo "Batch:"
# Null-delimited for safety
find . -name "*.txt" -print0 | xargs -0 -P 4 -I {} process {}
# Multiple arguments per process
cat urls.txt | xargs -P 10 -n 5 wget -q
# Limit max total arguments
echo {1..1000} | xargs -P 4 --max-args=50 echo
#!/usr/bin/env bash
set -euo pipefail
# Use sh -c for complex commands
find . -name "*.jpg" -print0 | \
xargs -0 -P 4 -I {} sh -c 'convert "$1" -resize 50% "thumb_$(basename "$1")"' _ {}
# Multiple placeholders
paste file1.txt file2.txt | \
xargs -P 4 -n 2 sh -c 'diff "$1" "$2" > "diff_$(basename "$1" .txt).patch"' _
# Process in batches
find . -name "*.log" -print0 | \
xargs -0 -P 4 -n 100 tar -czvf logs_batch.tar.gz
# With failure handling
find . -name "*.dat" -print0 | \
xargs -0 -P 4 -I {} sh -c 'process "$1" || echo "Failed: $1" >> failures.log' _ {}
#!/usr/bin/env bash
set -euo pipefail
# Track background jobs
declare -a PIDS=()
# Start jobs
for item in {1..10}; do
process_item "$item" &
PIDS+=($!)
done
# Wait for all
for pid in "${PIDS[@]}"; do
wait "$pid"
done
echo "All jobs complete"
# Or wait for any to complete
wait -n # Bash 4.3+
echo "At least one job complete"
#!/usr/bin/env bash
set -euo pipefail
# Maximum concurrent jobs
MAX_JOBS=4
# Simple semaphore using a counter
job_count=0
run_with_limit() {
local cmd=("$@")
# Wait if at limit
while ((job_count >= MAX_JOBS)); do
wait -n 2>/dev/null || true
((job_count--))
done
# Start new job
"${cmd[@]}" &
((job_count++))
}
# Usage
for item in {1..20}; do
run_with_limit process_item "$item"
done
# Wait for remaining
wait
#!/usr/bin/env bash
set -euo pipefail
MAX_JOBS=4
JOB_FIFO="/tmp/job_pool_$$"
# Create job slots
mkfifo "$JOB_FIFO"
trap 'rm -f "$JOB_FIFO"' EXIT
# Initialize slots
exec 3<>"$JOB_FIFO"
for ((i=0; i<MAX_JOBS; i++)); do
echo >&3
done
# Run with slot
run_with_slot() {
local cmd=("$@")
read -u 3 # Acquire slot (blocks if none available)
{
"${cmd[@]}"
echo >&3 # Release slot
} &
}
# Usage
for item in {1..20}; do
run_with_slot process_item "$item"
done
wait
exec 3>&-
#!/usr/bin/env bash
set -euo pipefail
WORK_QUEUE="/tmp/work_queue_$$"
RESULT_QUEUE="/tmp/result_queue_$$"
NUM_WORKERS=4
mkfifo "$WORK_QUEUE" "$RESULT_QUEUE"
trap 'rm -f "$WORK_QUEUE" "$RESULT_QUEUE"' EXIT
# Worker function
worker() {
local id="$1"
while read -r task; do
[[ "$task" == "STOP" ]] && break
# Process task
local result
result=$(process_task "$task" 2>&1)
echo "RESULT:$id:$task:$result"
done
}
# Start workers
for ((i=0; i<NUM_WORKERS; i++)); do
worker "$i" < "$WORK_QUEUE" > "$RESULT_QUEUE" &
done
# Result collector (background)
collect_results() {
while read -r line; do
[[ "$line" == "DONE" ]] && break
echo "$line" >> results.txt
done < "$RESULT_QUEUE"
} &
COLLECTOR_PID=$!
# Producer - send work
{
for task in "${TASKS[@]}"; do
echo "$task"
done
# Stop signals for workers
for ((i=0; i<NUM_WORKERS; i++)); do
echo "STOP"
done
} > "$WORK_QUEUE"
# Signal end of results
wait # Wait for workers
echo "DONE" > "$RESULT_QUEUE"
wait "$COLLECTOR_PID"
#!/usr/bin/env bash
set -euo pipefail
# Async function wrapper
async() {
local result_var="$1"
shift
local cmd=("$@")
# Create temp file for result
local result_file
result_file=$(mktemp)
# Run in background, save result
{
if "${cmd[@]}" > "$result_file" 2>&1; then
echo "0" >> "$result_file.status"
else
echo "$?" >> "$result_file.status"
fi
} &
# Store PID and result file location
eval "${result_var}_pid=$!"
eval "${result_var}_file='$result_file'"
}
# Await result
await() {
local result_var="$1"
local pid_var="${result_var}_pid"
local file_var="${result_var}_file"
# Wait for completion
wait "${!pid_var}"
# Get result
cat "${!file_var}"
local status
status=$(cat "${!file_var}.status")
# Cleanup
rm -f "${!file_var}" "${!file_var}.status"
return "$status"
}
# Usage
async result1 curl -s "https://api1.example.com/data"
async result2 curl -s "https://api2.example.com/data"
async result3 process_local_data
# Do other work here...
# Get results (blocks until complete)
data1=$(await result1)
data2=$(await result2)
data3=$(await result3)
#!/usr/bin/env bash
set -euo pipefail
declare -A TASKS
declare -A TASK_RESULTS
TASK_COUNTER=0
# Register async task
schedule() {
local cmd=("$@")
local task_id=$((++TASK_COUNTER))
local output_file="/tmp/task_${task_id}_$$"
"${cmd[@]}" > "$output_file" 2>&1 &
TASKS[$task_id]=$!
TASK_RESULTS[$task_id]="$output_file"
echo "$task_id"
}
# Check if task complete
is_complete() {
local task_id="$1"
! kill -0 "${TASKS[$task_id]}" 2>/dev/null
}
# Get task result
get_result() {
local task_id="$1"
wait "${TASKS[$task_id]}" 2>/dev/null || true
cat "${TASK_RESULTS[$task_id]}"
rm -f "${TASK_RESULTS[$task_id]}"
}
# Event loop
run_event_loop() {
local pending=("${!TASKS[@]}")
while ((${#pending[@]} > 0)); do
local still_pending=()
for task_id in "${pending[@]}"; do
if is_complete "$task_id"; then
local result
result=$(get_result "$task_id")
on_task_complete "$task_id" "$result"
else
still_pending+=("$task_id")
fi
done
pending=("${still_pending[@]}")
# Small sleep to prevent busy-waiting
((${#pending[@]} > 0)) && sleep 0.1
done
}
# Callback for completed tasks
on_task_complete() {
local task_id="$1"
local result="$2"
echo "Task $task_id complete: ${result:0:50}..."
}
#!/usr/bin/env bash
set -euo pipefail
# Fan-out: distribute work
fan_out() {
local -n items="$1"
local workers="$2"
local worker_func="$3"
local chunk_size=$(( (${#items[@]} + workers - 1) / workers ))
local pids=()
for ((i=0; i<workers; i++)); do
local start=$((i * chunk_size))
local chunk=("${items[@]:start:chunk_size}")
if ((${#chunk[@]} > 0)); then
$worker_func "${chunk[@]}" &
pids+=($!)
fi
done
# Return PIDs for fan_in
echo "${pids[*]}"
}
# Fan-in: collect results
fan_in() {
local -a pids=($1)
local results=()
for pid in "${pids[@]}"; do
wait "$pid"
done
}
# Example worker
process_chunk() {
local items=("$@")
for item in "${items[@]}"; do
echo "Processed: $item"
done
}
# Usage
data=({1..100})
pids=$(fan_out data 4 process_chunk)
fan_in "$pids"
#!/usr/bin/env bash
set -euo pipefail
# Map function
parallel_map() {
local -n input="$1"
local map_func="$2"
local workers="${3:-$(nproc)}"
printf '%s\n' "${input[@]}" | \
parallel -j "$workers" "$map_func"
}
# Reduce function
reduce() {
local reduce_func="$1"
local accumulator="$2"
while IFS= read -r value; do
accumulator=$($reduce_func "$accumulator" "$value")
done
echo "$accumulator"
}
# Example: Sum of squares
square() { echo $(($1 * $1)); }
add() { echo $(($1 + $2)); }
numbers=({1..100})
sum_of_squares=$(
parallel_map numbers square 4 | reduce add 0
)
echo "Sum of squares: $sum_of_squares"
# Word count example
word_count_map() {
tr ' ' '\n' | sort | uniq -c
}
word_count_reduce() {
sort -k2 | awk '{
if ($2 == prev) { count += $1 }
else { if (prev) print count, prev; count = $1; prev = $2 }
} END { if (prev) print count, prev }'
}
cat large_text.txt | \
parallel --pipe -N1000 word_count_map | \
word_count_reduce
#!/usr/bin/env bash
set -euo pipefail
# Process in optimal batch sizes
optimal_batch_process() {
local items=("$@")
local batch_size=100
local workers=$(nproc)
printf '%s\n' "${items[@]}" | \
parallel --pipe -N"$batch_size" -j"$workers" '
while IFS= read -r item; do
process_item "$item"
done
'
}
# Dynamic batch sizing based on memory
dynamic_batch() {
local mem_available
mem_available=$(free -m | awk '/^Mem:/ {print $7}')
# Adjust batch size based on available memory
local batch_size=$((mem_available / 100)) # 100MB per batch
((batch_size < 10)) && batch_size=10
((batch_size > 1000)) && batch_size=1000
parallel --pipe -N"$batch_size" process_batch
}
#!/usr/bin/env bash
set -euo pipefail
# Use tmpfs for intermediate files
setup_fast_temp() {
local tmpdir="/dev/shm/parallel_$$"
mkdir -p "$tmpdir"
trap 'rm -rf "$tmpdir"' EXIT
echo "$tmpdir"
}
# Buffer I/O operations
buffered_parallel() {
local input="$1"
local tmpdir
tmpdir=$(setup_fast_temp)
# Split input into chunks
split -l 1000 "$input" "$tmpdir/chunk_"
# Process chunks in parallel
parallel process_chunk {} ::: "$tmpdir"/chunk_*
# Combine results
cat "$tmpdir"/result_* > output.txt
}
# Avoid disk I/O with process substitution
no_disk_parallel() {
# Instead of:
# command > temp.txt
# parallel process ::: temp.txt
# rm temp.txt
# Do this:
command | parallel --pipe process
}
#!/usr/bin/env bash
set -euo pipefail
# Pin workers to specific CPUs
cpu_pinned_parallel() {
local num_cpus
num_cpus=$(nproc)
for ((cpu=0; cpu<num_cpus; cpu++)); do
taskset -c "$cpu" process_worker "$cpu" &
done
wait
}
# NUMA-aware processing
numa_parallel() {
local num_nodes
num_nodes=$(numactl --hardware | grep "available:" | awk '{print $2}')
for ((node=0; node<num_nodes; node++)); do
numactl --cpunodebind="$node" --membind="$node" \
process_chunk "$node" &
done
wait
}
#!/usr/bin/env bash
set -euo pipefail
# Track failures
declare -A FAILURES
parallel_with_retry() {
local max_retries=3
local items=("$@")
for item in "${items[@]}"; do
local retries=0
local success=false
while ((retries < max_retries)) && ! $success; do
if process_item "$item"; then
success=true
else
((retries++))
echo "Retry $retries for $item" >&2
sleep $((retries * 2)) # Exponential backoff
fi
done
if ! $success; then
FAILURES["$item"]="Failed after $max_retries retries"
fi
done &
wait
}
# Report failures
report_failures() {
if ((${#FAILURES[@]} > 0)); then
echo "Failures:" >&2
for item in "${!FAILURES[@]}"; do
echo " $item: ${FAILURES[$item]}" >&2
done
return 1
fi
}
#!/usr/bin/env bash
set -euo pipefail
# Global cancellation flag
CANCELLED=false
declare -a WORKER_PIDS=()
cancel_all() {
CANCELLED=true
for pid in "${WORKER_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
}
trap cancel_all SIGINT SIGTERM
cancellable_worker() {
local id="$1"
while ! $CANCELLED; do
# Check for work
if work=$(get_next_work); then
process_work "$work"
else
sleep 0.1
fi
done
}
# Start workers
for ((i=0; i<NUM_WORKERS; i++)); do
cancellable_worker "$i" &
WORKER_PIDS+=($!)
done
# Wait with interrupt support
wait || true
Master parallel processing for efficient multi-core utilization and faster script execution.
Create distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
This skill should be used when the user asks to "create a hookify rule", "write a hook rule", "configure hookify", "add a hookify rule", or needs guidance on hookify rule syntax and patterns.
This skill should be used when the user asks to "create an agent", "add an agent", "write a subagent", "agent frontmatter", "when to use description", "agent examples", "agent tools", "agent colors", "autonomous agent", or needs guidance on agent structure, system prompts, triggering conditions, or agent development best practices for Claude Code plugins.