GitHub
Open Github

UsePipeline

Orchestrate complex multi-step async flows with dependency management, parallel execution, rollback, and per-step retry.

Installation

pnpm add @hookraft/usepipeline

Usage

import { usePipeline } from "@hookraft/usepipeline"
const pipeline = usePipeline({
  steps: [
    {
      id: "validate",
      run: async () => await validateCart(),
      rollback: async () => await restoreCart(),
    },
    {
      id: "payment",
      run: async () => await chargePayment(),
      rollback: async () => await refundPayment(),
      dependsOn: ["validate"],
    },
    {
      id: "create-order",
      run: async () => await createOrder(),
      rollback: async () => await cancelOrder(),
      dependsOn: ["payment"],
    },
  ],
  onStepComplete: (id) => console.log(`${id} done`),
  onStepFailed: (id, err) => console.log(`${id} failed`, err),
  onComplete: (results) => console.log("All done", results),
  onRollback: () => console.log("Rolled back"),
})

Full Example

This example shows a real checkout flow — validate cart, charge payment, create order and send email in parallel, then update inventory. Each step has a rollback so if anything fails, everything is cleanly reversed.

function CheckoutFlow() {
  const pipeline = usePipeline({
    steps: [
      // Step 1 — runs first, no dependencies
      {
        id: "validate",
        run: async () => {
          const res = await fetch("/api/cart/validate")
          return res.json()
        },
        rollback: async () => {
          await fetch("/api/cart/restore", { method: "POST" })
        },
      },

      // Step 2 — only runs after validate completes
      {
        id: "payment",
        run: async (ctx) => {
          // ctx.results contains results from previous steps
          const cart = ctx.results["validate"]
          const res = await fetch("/api/payment/charge", {
            method: "POST",
            body: JSON.stringify(cart),
          })
          return res.json()
        },
        rollback: async (ctx) => {
          const payment = ctx.results["payment"]
          await fetch("/api/payment/refund", {
            method: "POST",
            body: JSON.stringify(payment),
          })
        },
        dependsOn: ["payment"],
        retries: 2, // retry up to 2 times before failing
      },

      // Step 3 — runs after payment, sequential
      {
        id: "create-order",
        run: async (ctx) => {
          const payment = ctx.results["payment"]
          const res = await fetch("/api/orders", {
            method: "POST",
            body: JSON.stringify(payment),
          })
          return res.json()
        },
        rollback: async (ctx) => {
          const order = ctx.results["create-order"]
          await fetch(`/api/orders/${order.id}/cancel`, { method: "POST" })
        },
        dependsOn: ["payment"],
      },

      // Step 4 — runs in parallel alongside create-order
      {
        id: "send-email",
        run: async (ctx) => {
          const payment = ctx.results["payment"]
          await fetch("/api/email/confirmation", {
            method: "POST",
            body: JSON.stringify(payment),
          })
        },
        dependsOn: ["payment"],
        parallel: true, // runs at the same time as create-order
      },

      // Step 5 — runs after create-order completes
      {
        id: "inventory",
        run: async (ctx) => {
          const order = ctx.results["create-order"]
          await fetch("/api/inventory/update", {
            method: "POST",
            body: JSON.stringify(order),
          })
        },
        dependsOn: ["create-order"],
      },
    ],

    onStepComplete: (id) => console.log(`✓ ${id} complete`),
    onStepFailed: (id, err) => console.error(`✗ ${id} failed`, err),
    onStepRollback: (id) => console.log(`↩ rolling back ${id}`),
    onComplete: (results) => toast("Order placed successfully!"),
    onFailed: (failedStep, err) => toast(`Failed at ${failedStep}`),
    onRollback: () => toast("Something went wrong. All changes reversed."),
  })

  return (
    <div>
      {/* Progress bar */}
      <div style={{ width: `${pipeline.progress}%` }} />
      <p>{pipeline.progress}% complete</p>

      {/* Currently running step */}
      {pipeline.current && (
        <p>Running: {pipeline.current}</p>
      )}

      {/* Step breakdown */}
      {pipeline.steps.map((step) => (
        <div key={step.id}>
          <span>{step.id}</span>
          <span>{step.status}</span>
          {step.status === "failed" && (
            <button onClick={() => pipeline.retry(step.id)}>
              Retry
            </button>
          )}
        </div>
      ))}

      {/* Controls */}
      <button
        onClick={pipeline.start}
        disabled={pipeline.is("running")}
      >
        {pipeline.is("running") ? "Processing..." : "Place Order"}
      </button>

      <button
        onClick={pipeline.rollback}
        disabled={!pipeline.is("failed")}
      >
        Rollback
      </button>

      <button onClick={pipeline.reset}>
        Reset
      </button>
    </div>
  )
}

How steps connect

Steps run in the order defined by dependsOn. If a step has no dependsOn it runs immediately. Steps that share the same dependsOn and have parallel: true run at the same time.

validate
   ↓
payment
   ↓
create-order ──(parallel)── send-email
   ↓
inventory

Each step receives ctx.results — an object containing the return values of all previously completed steps. This lets you pass data from one step to the next without any external state.

Options

PropTypeDefault
steps
PipelineStep[]
-
onStepComplete
(id: string, result: unknown) => void
-
onStepFailed
(id: string, error: unknown) => void
-
onStepRollback
(id: string) => void
-
onComplete
(results: Record<string, unknown>) => void
-
onFailed
(failedStepId: string, error: unknown) => void
-
onRollback
() => void
-

Step Options

PropTypeDefault
id
string
-
run
(ctx: PipelineContext) => Promise<unknown>
-
rollback
(ctx: PipelineContext) => Promise<void>
-
dependsOn
string[]
-
parallel
boolean
-
retries
number
0

Returns

PropTypeDefault
status
PipelineStatus
-
is(status)
(s: PipelineStatus) => boolean
-
steps
PipelineStepState[]
-
current
string | null
-
progress
number
-
results
Record<string, unknown>
-
start()
() => Promise<void>
-
retry(stepId)
(id: string) => Promise<void>
-
rollback()
() => Promise<void>
-
reset()
() => void
-