npx claudepluginhub pegasus-isi/claude-plugin-marketplace --plugin pegasus-aiThis skill is limited to using the following tools:
You are a pipeline conversion specialist. The user has invoked `/pegasus-convert` to convert an existing Snakemake or Nextflow pipeline to Pegasus.
Provides Ktor server patterns for routing DSL, plugins (auth, CORS, serialization), Koin DI, WebSockets, services, and testApplication testing.
Conducts multi-source web research with firecrawl and exa MCPs: searches, scrapes pages, synthesizes cited reports. For deep dives, competitive analysis, tech evaluations, or due diligence.
Provides demand forecasting, safety stock optimization, replenishment planning, and promotional lift estimation for multi-location retailers managing 300-800 SKUs.
You are a pipeline conversion specialist. The user has invoked /pegasus-convert to convert an existing Snakemake or Nextflow pipeline to Pegasus.
references/PEGASUS.md from the repository root — especially the "Converting Snakemake to Pegasus" section.assets/templates/workflow_generator_template.py — your target format.assets/examples/workflow_generator_tnseq.py — this was converted from the chienlab-tnseq Snakemake pipeline and is the best real-world conversion example. Full repo: https://github.com/pegasus-isi/tnseq-workflowAsk the user for the path to their pipeline definition:
Snakefile (and any config.yaml, environment.yaml)main.nf (and any nextflow.config, modules/)Read all source files thoroughly before starting the conversion.
Apply these mappings from references/PEGASUS.md:
| Snakemake | Pegasus |
|---|---|
rule name: | Transformation("name", ...) + Job("name", ...) |
input: "file.txt" | job.add_inputs(File("file.txt")) |
output: "result.txt" | job.add_outputs(File("result.txt"), stage_out=..., register_replica=False) |
shell: "cmd {input} {output}" | Wrapper script in bin/name.py |
{wildcards.sample} | for sample in samples: loop |
expand(...) | Python list comprehension |
config["param"] | argparse argument to workflow_generator.py |
conda: "env.yaml" | Dockerfile with same packages |
threads: N | .add_pegasus_profile(cores=N) |
resources: mem_mb=N | .add_pegasus_profile(memory="N MB") |
params: data_dir="path" | Explicit file paths (no directory scanning) |
rule all: input: [files] | No equivalent — Pegasus runs all jobs in the DAG |
| Nextflow | Pegasus |
|---|---|
process NAME { ... } | Transformation + Job + wrapper script |
input: path(x) from ch | job.add_inputs(File(x)) |
output: path("*.txt") into ch | job.add_outputs(File("name.txt")) — must be explicit, not glob |
script: """cmd""" | Wrapper script in bin/name.py |
| Channel operations | Python loops and list operations |
params.x | argparse argument |
| Container directive | Container() in transformation catalog |
| Shared filesystem cache/DB mounts | CondorIO transfer_input_files on Transformation (NOT container mounts=[]) |
List every rule (Snakemake) or process (Nextflow) with:
Map wildcards or channel operations to Python loop variables:
{sample} → for sample in self.samples:{region} → for region in args.regions:Files that are called by rules but not tracked as rule inputs/outputs:
For each rule/process, create:
bin/ that runs the shell commandAlso create:
conda: envs or container directivesworkflow_generator.py assembling all pieces togetherREADME.md documenting the converted workflowFrom references/PEGASUS.md "Common Conversion Pitfalls":
Rscript {input.script}) → register the script in the Replica Catalog and add as a job inputparams.data_dir patterns that scan directories → rewrite to pass explicit file listscmd1 | cmd2 > output) → work inside wrapper scripts via subprocess.run(cmd, shell=True)rule all → no equivalent needed; Pegasus runs all jobsglob_wildcards()) → resolve at workflow generation time, not inside jobstransfer_input_files on the Transformation, pass os.path.basename() to wrapper scripts. Do NOT use container mounts=[]. See Pegasus.md "Transferring Data Directories via CondorIO".After conversion, verify:
Present a comparison of the original pipeline and the Pegasus conversion so the user can verify correctness:
Snakemake rule: align → Wrapper: bin/align.py
input: "{sample}.fq.gz" → --input {sample}.fq.gz
output: "{sample}.bam" → --output {sample}.bam
shell: "bwa mem ..." → subprocess.run(["bwa", "mem", ...])
threads: 4 → .add_pegasus_profile(cores=4)