From oracle-ai-data-platform-workbench-engineer-agent
Author, schedule, run, and monitor AIDP Jobs (task DAGs of notebooks/python with cron) via CLI or REST API. Use when building pipelines, creating/updating/running jobs, scheduling recurring runs, checking run status, reading task output, or canceling runs.
How this skill is triggered — by the user, by Claude, or both
Slash command
/oracle-ai-data-platform-workbench-engineer-agent:aidp-pipelinesThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
Author and operate AIDP Jobs — task DAGs over notebooks/python with optional cron — and watch their runs.
aidp-pipelines — AIDP Jobs (build, schedule, run, monitor)Author and operate AIDP Jobs — task DAGs over notebooks/python with optional cron — and watch their runs.
Engine precedence (see references/aidp-cli-map.md): prefer the
official aidp workflow … CLI when installed; fall back to oci raw-request otherwise. Both hit the
same REST API with the same auth — no MCP / ai-data-engineer-agent repo required. Persist every mutation
body to .aidp/payloads/ and confirm before running (see references/payloads.md).
Auth + base URL: CLI flags
--instance-id <DATALAKE_OCID> --auth api_key --profile DEFAULT --region <r>; REST basehttps://aidp.<region>.oci.oraclecloud.com/20240831/dataLakes/<DATALAKE_OCID>/…. On 401/403 follow the auth ladder (oci session refresh --profile AIDP_SESSION) inreferences/oci-raw-request.md.
aidp workflow list-jobs · create-job · get-job · update-job · delete-job
(REST: GET|POST|PUT|DELETE /workspaces/{ws}/jobs[/{key}]).aidp workflow create-job-run · get-job-run · list-job-runs · list-recent-job-runs ·
list-task-runs · get-task-run · fetch-output / export-task-run-output · cancel-job-run[s] ·
repair-job-run (REST: POST …/jobs/{key}/actions/run, GET …/jobRuns/{runId}, task-run output).Listing reliability (LIVE-LESSON 2026-06-12): job lists are large and paginated — a real workspace can hold 100+ jobs (live:
playgroundreturned 100 on the first page). Always paginate (limit
- the
opc-next-pageheader) and never conclude "no jobs" from a single call. First confirm the call returned HTTP 2xx with a JSON body — a CLI/auth/network error (or a shell-quoting bug) whose output is parsed as an empty list is a silent false-negative that turns 100+ jobs into "0". Jobs are workspace-scoped and theirpath/notebookPathare rooted at/Workspace/.... See the reliability conventions in references/oci-raw-request.md.
oci raw-request, control-plane)Author: GET /workspaces/{ws}/jobs (list) · POST /workspaces/{ws}/jobs (create) ·
GET|PUT|DELETE /workspaces/{ws}/jobs/{key} (read / fetch-modify-put update / delete).
Run/monitor: POST /workspaces/{ws}/jobs/{key}/actions/run (trigger) ·
GET …/jobs/{key}/jobRuns and GET …/jobRuns/{runId} (run status + task-to-task-run mapping) ·
task runs + output (taskRuns / task-run output under the run) for executed code, stdout, notebook cells.
REST run/monitor sub-shapes (
jobRuns, per-task output field names) are probe-first per references/no-mcp-rest-map.md — a bare path returns a400naming the missing param. Don't present a sub-path as confirmed until a live 2xx; verify before destructive ops.
A job is created with a name-only body, then a second update-job call adds clusters + tasks. A single
POST that inlines tasks/displayName is rejected (400 Invalid resource name / Invalid Task type).
Confirmed live on aidp_skilltest and via the official SDK sample workflow_notebook_job_sample.py.
POST …/workspaces/<ws>/jobs with
{"name":"etl_daily.job","path":"/Workspace/Shared","maxConcurrentRuns":1} → 201, returns the job key.
(Use name — a resource name, not displayName; names allow letters/_/. and must start with a letter.)PUT …/workspaces/<ws>/jobs/<key> with
jobClusters + tasks:
{ "jobClusters": [ { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" } ],
"tasks": [
{ "type": "NOTEBOOK_TASK", "taskKey": "extract", "runIf": "ALL_SUCCESS",
"cluster": { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" },
"source": "WORKSPACE", "notebookPath": "/Workspace/Shared/extract.ipynb", "dependsOn": [] },
{ "type": "NOTEBOOK_TASK", "taskKey": "load", "cluster": { "clusterKey": "<CLUSTER_KEY>", "clusterName": "<CLUSTER_NAME>" },
"source": "WORKSPACE", "notebookPath": "/Workspace/Shared/load.ipynb", "dependsOn": ["extract"] } ] }
type (NOTEBOOK_TASK/PYTHON_TASK/…), not taskType. dependsOn defines the DAG.clusterName pitfall: the per-task cluster.clusterName must be the real cluster name, not a UUID
(a UUID → WORKFLOW_EXECUTION_0049 Cluster not found); pair it with clusterKey..aidp/payloads/.aidp workflow create-job / update-job CLI wraps these two steps.Live-verified 2026-06-10 on de-agent — correction: the Step-2
PUT …/jobs/<key>is a full replace, not a merge — the update body MUST re-sendname+path+maxConcurrentRunsalongsidejobClusters/tasks. Omittingnamereturns400 InvalidParameter("name must not be null").
POST …/jobs/{key}/actions/run → returns a run; GET …/jobRuns/{runId} for status + task-to-task-run mapping.GET …/jobRuns (filter by job key) for history; drill into a run for per-task status/type/duration.POST the run's cancel action.Live-verified 2026-06-10 on de-agent — corrections (full create→run→SUCCESS→delete lifecycle):
- Run trigger:
POST …/jobs/{key}/actions/runreturns 404 on the20240831instance. The WORKING trigger isPOST …/workspaces/{ws}/jobRunswith body{"jobKey":"<key>"}→ 201. TreatPOST …/jobRuns {jobKey}as the verified trigger;actions/runis a probe only.- Task-run output: read via
GET …/taskRuns/{key}(200, hasoutputKey) +POST …/taskRuns/{key}/actions/fetchOutput(200, NOTEBOOK payload). Job-executed notebooks may persist empty cells, so confirm results from taskstate/stateMessage(e.g. "Successfully executed notebook…"), not from the fetched cell output.
This is how AIDP runs a notebook workload (job-based; the CLI/SDK does not execute cells interactively —
for that use scripts/aidp_sql.py). Mirrors the official Codex demo:
aidp-notebooks) and the target cluster is ACTIVE (aidp-cluster-ops).# Step 1: create name-only -> returns job key
aidp workflow create-job --instance-id <OCID> --auth api_key --profile DEFAULT \
--body '{"name":"WeatherSummary.job","path":"/Workspace/WeatherDemo","maxConcurrentRuns":1}'
# Step 2: update -> add jobClusters + the NOTEBOOK_TASK (type, cluster{clusterKey,clusterName}, source, notebookPath)
aidp workflow update-job --instance-id <OCID> --auth api_key --profile DEFAULT --job-key <JOB_KEY> \
--body '{"jobClusters":[{"clusterKey":"<CK>","clusterName":"<CNAME>"}],"tasks":[{"type":"NOTEBOOK_TASK","taskKey":"summary","cluster":{"clusterKey":"<CK>","clusterName":"<CNAME>"},"source":"WORKSPACE","notebookPath":"/Workspace/WeatherDemo/WeatherSummary.ipynb","dependsOn":[]}]}'
aidp workflow create-job-run --instance-id <OCID> … --body '{"jobKey":"<JOB_KEY>"}' → returns a run key.aidp workflow get-job-run … <RUN_KEY> every few seconds until
SUCCESS/FAILED/CANCELED (PENDING → RUNNING → terminal).aidp workflow fetch-output / export-task-run-output for the
task run, and summarize the report the notebook produced (not just the run status). On FAILED → aidp-spark-debugging.Task types beyond NOTEBOOK_TASK/PYTHON_TASK: If/Else (conditional branching on a condition),
Nested Job (embed another job's tasks as one node), JAR (Scala/Java — JDK/Scala/Spark version must
match the cluster runtime). Tasks can depend on success or failure of a parent; tasks sharing a parent
run in parallel.
Retry policy (per task): retryCount (max attempts), retryInterval (wait between), retryOnTimeout
(retry if the task exceeds its time limit). Streaming task: mark the notebook/python task Streaming —
disables execution timeout + task dependencies, runs continuously until stopped, auto-restarts at monthly
maintenance; set Max Concurrent Runs = 1. Scheduling min frequency is 30 min. Run statuses include
SKIPPED (prior run still active) and TIMED_OUT.
Repair a failed run (rerun only the failed/selected tasks, don't re-run the whole DAG):
oci raw-request --http-method POST --profile DEFAULT \
--target-uri "https://aidp.<region>.oci.oraclecloud.com/20240831/dataLakes/<OCID>/workspaces/<WS>/jobRuns/<RUN_KEY>/actions/repair" \
--request-body '{ "...RepairJobRunDetails: tasks to rerun + optional Key/Value or JSON params..." }'
(SDK workflow.repair_job_run; {{job.repair_count}} increments per repair — confirm the body live before use.)
Parameterization — precedence Job Run > Task > Job (runtime overrides task, task overrides job; job
params are immutable during task execution). Reference system parameters with {{…}} in task
configs/paths: {{job.id}},{{job.name}},{{job.run_id}},{{job.repair_count}},{{job.parameters.[name]}},
{{job.trigger.type}},{{job.trigger.file_arrival.location}} (file-arrival trigger),{{task.name}},
{{task.run_id}},{{task.execution_count}},{{tasks.[name].result_state}} (success/failed/skipped/…),
{{tasks.[name].error_code}},{{workspace.id}},{{hub.region}}. For passing computed values notebook→notebook
inside a task, see oidlUtils.notebook.run/exit in aidp-notebooks.
aidp-notebooks / aidp-workspace-files) and the cluster.POST to create, trigger a test run, poll the run to terminal, read task output.aidp-spark-debugging (logs + Spark UI) with the failing task run.DELETE …/jobs/{key}) when validating.For ad-hoc Spark-SQL outside a job, use the bundled helper — no MCP required:
python "$PLUGIN_DIR/scripts/aidp_sql.py" --region <r> --datalake <DATALAKE_OCID> --workspace <WS> \
--cluster <key> --code "spark.sql('SELECT 1').show()"
It mints a UPST from the api_key DEFAULT profile and returns JSON (status/outputs/spark_job_ids). Production pipeline steps belong in a notebook/python task driven by the Job, not this helper.
aidp-notebooks, aidp-workspace-files, aidp-spark-debugging, aidp-data-quality.2plugins reuse this skill
First indexed Jun 12, 2026
npx claudepluginhub anthropics/claude-plugins-official --plugin oracle-ai-data-platform-workbench-engineer-agentManages Databricks Jobs: create, list, run, update, delete multi-task DAG workflows with schedules, notifications using Python SDK, CLI, or Asset Bundles.
Create, edit, and execute AIDP notebooks and manage kernel sessions. Use for authoring notebooks, running cells on a Spark cluster, or building interactive notebooks with widgets + Spark SQL + Plotly.
Develops and deploys Lakeflow Jobs on Databricks using CLI bundles for data engineering with notebooks, Python wheels, or SQL tasks. Invoke before implementation.