Webhook Sources & Custom Workflows
Author inbound webhook sources that turn external HTTP payloads into Hogsend events, reach for a built-in preset, and write custom Hatchet tasks for background work.
Overview
This guide covers the two extension points your app uses to take in external events and run background work:
- Webhook sources — turn an inbound HTTP payload into an
IngestEventthat flows through the engine's ingestion pipeline and can trigger journeys. - Custom Hatchet tasks — durable background jobs (one-off maintenance, backfills, event-driven side effects) that run in the worker alongside the engine's built-in workflows.
Both are your content. You author them in your scaffolded app, import defineWebhookSource and the hatchet client from @hogsend/engine, and register them in arrays you own. You never edit engine internals. Relative imports use the ESM .js extension.
Inbound sources are not outbound destinations. This guide is about the inbound half — external systems pushing HTTP requests into Hogsend. The symmetric outbound half — fanning Hogsend's own event stream (contact.*, email.*, journey.completed, bucket.*) out to PostHog, Slack, a CRM, or a warehouse — is covered by Destinations (defineDestination()). The durable delivery spine those destinations ride (signed whsec_ POSTs, retries, backoff, DLQ) is documented in the Outbound webhooks reference. Don't confuse defineWebhookSource() (in) with defineDestination() (out).
Webhook sources
A webhook source turns an inbound HTTP request into an IngestEvent. Each source is served by the engine at POST /v1/webhooks/:sourceId, and its output is fed straight into ingestEvent() — so a webhook can store an event, push it to Hatchet (routing it to matching journey tasks), evaluate journey exit conditions, and upsert the contact, all in one request.
You write the source files; the engine owns the route. Edit only src/webhook-sources/.
defineWebhookSource()
defineWebhookSource is imported from @hogsend/engine, alongside the DefinedWebhookSource, WebhookSourceCtx, and VerifySignatureArgs types:
import {
defineWebhookSource,
type DefinedWebhookSource,
type WebhookSourceCtx,
type VerifySignatureArgs,
} from "@hogsend/engine";The definition object has four fields:
| Field | Type | Notes |
|---|---|---|
meta.id | string | The :sourceId segment in the URL. Keep it URL-safe. |
meta.name | string | Human-readable label. |
meta.description? | string | Optional. |
auth | discriminated union on type | "match" or "signature" — see Authentication. |
schema? | z.ZodSchema<T> | Optional Zod validator; on success payload is typed T. |
transform(payload, ctx) | => Promise<IngestEvent | null> | Map the payload → event. Return null to accept-and-skip. |
function defineWebhookSource<T>(def: {
meta: { id: string; name: string; description?: string };
auth: WebhookSourceAuth;
schema?: z.ZodSchema<T>;
transform(
payload: T,
ctx: WebhookSourceCtx,
): Promise<IngestEvent | null>;
}): DefinedWebhookSource<T>;Authentication
auth is a discriminated union on type — pick the variant that matches your provider. The two variants differ critically in how they treat an unset secret:
type WebhookSourceAuth =
| {
type: "match"; // plain shared-secret equality
header: string; // HTTP header carrying the secret (or Authorization: Bearer)
envKey: string; // env var holding the expected secret
}
| {
type: "signature"; // provider HMAC signature verification
scheme: "svix" | "stripe" | "hmac-hex";
envKey: string; // env var holding the signing secret
header: string; // signature header (e.g. svix-signature, stripe-signature)
fallbackMatchHeader?: string; // optional plain-secret header that also passes
verify?(args: VerifySignatureArgs): boolean | Promise<boolean>; // per-source override
};| Variant | When to use | Behavior when the secret is unset |
|---|---|---|
type: "match" | A simple shared-secret header. The route compares the configured secret against header (or Authorization: Bearer). | Open — the source still accepts requests with no auth. This is the legacy variant the PostHog scaffold source uses. Always set the secret in any non-local environment. |
type: "signature" | Provider HMAC verification. The route reads the exact raw body once and verifies it against the signing secret using scheme. | Fails closed — returns 401 and never reaches transform. Signature sources are security-sensitive. |
For type: "signature", scheme is one of:
"svix"— Standard Webhooks / Svix signatures (thesvix-id/svix-timestamp/svix-signatureheader set). This is the same machineryplugin-resenduses for inbound Resend webhooks."stripe"— Stripe'sstripe-signature: t=<ts>,v1=<hex>format, computed withnode:crypto(nostripeSDK) and enforcing a 5-minute timestamp tolerance. Multiplev1=candidates are accepted to support secret rotation."hmac-hex"— a plainHMAC_SHA256(secret, rawBody)rendered as lowercase hex, constant-time compared against the header value (e.g. Segment'sx-signature).
Use fallbackMatchHeader when a "signature" provider may also send a plain shared-secret header: when the scheme's signature headers are absent but this header matches the secret verbatim, the request is accepted. (This is how Supabase's plain x-supabase-webhook-secret mode coexists with its Svix mode.) Supply verify only when you need to override the built-in scheme verification entirely — it receives the exact received bytes and must return (or resolve to) a boolean.
Validation
If you provide schema, the route runs schema.safeParse(payload) before calling transform; a parse failure returns 400 and transform never runs. Inside transform, payload is the parsed, typed value T.
The transform → IngestEvent contract
transform(payload, ctx) returns an IngestEvent (or null). This is the exact shape the engine consumes — note the two separate property bags:
interface IngestEvent {
event: string; // event name — what journeys trigger on
userId?: string; // external user id (optional — email-only / anon allowed)
userEmail?: string; // user email — "" if unknown, not undefined
anonymousId?: string; // stable anonymous id (future anon→identified path)
eventProperties: Record<string, unknown>; // → user_events row + journey trigger.where / exitOn
contactProperties?: Record<string, unknown>; // → contacts.properties merge ONLY
idempotencyKey?: string; // optional dedupe key — use the provider's event id
occurredAt?: Date | string; // optional event time (backfill/replay); defaults to now
}The property split is load-bearing and matches the rest of the engine (see Identity):
eventProperties(required) is the behavioral bag. It lands on theuser_eventsrow and is what a journey'strigger.where/exitOnevaluates.contactProperties(optional) is the profile/identity bag. It merges onto the durablecontacts.propertiesrecord and never touches the event row.
ctx is { db, logger, rawBody?, headers? } — a Drizzle Database, the engine logger, and (populated by the route) the exact raw request body bytes and the lowercased request headers. The raw body is what signature schemes verify over, and it's available to any transform that needs provider-specific raw access. ctx does not carry hatchet or the registry — those are applied by the route when it calls ingestEvent.
A few behaviors that match the engine:
eventis the routing key. Hatchet routes the pushed event to every journey whose trigger declaresonEvents: [thatEvent]. The decision to enroll (or exit) is then made by trigger/exit conditions — see the Conditions guide.userEmailshould be""when unknown — the ingestion pipeline treats a falsy email as "no email" for the contact upsert. Don't passundefined.- Only JSON-scalar properties survive the push to Hatchet.
string | number | boolean | nullreach the journey task; nested objects/arrays are dropped from that payload (they're still stored on theuser_eventsrow). Flatten anything a journey needs to branch on into a scalar property. idempotencyKey— when set, a duplicate delivery with the same key is a no-op ({ stored: false }). Use the provider's event id when one is available.- Return
nullto accept the delivery (200 { ok: true, skipped: true }) without ingesting — e.g. event types you don't care about.
Example
Here's a hand-rolled source that maps a generic billing provider's webhook into Hogsend events, splitting customer profile fields from event data:
import { defineWebhookSource } from "@hogsend/engine";
import { z } from "zod";
const billingWebhookSchema = z.object({
id: z.string(),
type: z.string(),
customer: z.object({
id: z.string(),
email: z.string().optional(),
name: z.string().optional(),
plan: z.string().optional(),
}),
});
export const billingSource = defineWebhookSource({
meta: {
id: "billing",
name: "Billing",
description: "Receives billing webhooks and triggers lifecycle journeys.",
},
auth: {
type: "signature",
scheme: "hmac-hex",
envKey: "BILLING_WEBHOOK_SECRET",
header: "x-signature",
},
schema: billingWebhookSchema,
async transform(payload) {
// Skip event types we don't model.
if (!payload.type.startsWith("subscription.")) return null;
return {
event: payload.type, // e.g. "subscription.upgraded" — the routing key
userId: payload.customer.id,
userEmail: payload.customer.email ?? "", // "" when unknown, never undefined
// Behavioral data → the event row + journey conditions.
eventProperties: {
source: "billing",
billingEventId: payload.id,
plan: payload.customer.plan ?? null, // flat scalar so a journey can branch on it
},
// Profile data → the contact record only.
contactProperties: {
name: payload.customer.name,
plan: payload.customer.plan,
},
idempotencyKey: payload.id, // dedupe at-least-once redeliveries
};
},
});Register a source
Wiring a source up is two edits — both in files you own.
1. Add it to the webhookSources array in src/webhook-sources/index.ts:
import type { DefinedWebhookSource } from "@hogsend/engine";
import { posthogSource } from "./posthog.js";
import { billingSource } from "./billing.js"; // your new source
export const webhookSources: DefinedWebhookSource[] = [
posthogSource,
billingSource,
];2. Confirm the array is passed to createApp in your thin src/index.ts (the scaffold already threads this):
import { createApp, createHogsendClient } from "@hogsend/engine";
import { webhookSources } from "./webhook-sources/index.js";
const client = createHogsendClient({ journeys, email: { templates } });
const app = createApp(client, { webhookSources });No engine code is touched. Your source is now live at POST /v1/webhooks/billing.
Built-in presets (no code)
Before hand-rolling a source, check whether the engine already ships one. Four integration presets are built into @hogsend/engine and served with no consumer code:
| Preset | Route | Scheme | Secret env var |
|---|---|---|---|
clerk | POST /v1/webhooks/clerk | svix | CLERK_WEBHOOK_SECRET |
supabase | POST /v1/webhooks/supabase | svix (+ plain x-supabase-webhook-secret fallback) | SUPABASE_WEBHOOK_SECRET |
stripe | POST /v1/webhooks/stripe | stripe | STRIPE_WEBHOOK_SECRET |
segment | POST /v1/webhooks/segment | hmac-hex | SEGMENT_WEBHOOK_SECRET |
A preset mounts only when both its secret env var is set and ENABLED_WEBHOOK_PRESETS allows it:
"*"or absent → auto: every preset whose secret is set.- a comma-separated list of ids → exactly those (still requires the secret).
"none"→ all presets off.
A preset with no secret is never mounted (signature sources fail closed). Defining your own defineWebhookSource with the same id overrides the preset — the consumer always wins. Each preset normalizes provider events into Hogsend's vocabulary and keeps the eventProperties/contactProperties split — for example, Stripe maps customer.created → contact.created (profile → contactProperties) and invoice.paid → invoice.paid (event-only). The per-provider mapping is documented in Integrations.
Custom workflows
Custom Hatchet tasks are durable background jobs you own — one-off maintenance, backfills, cron-style work, or event-driven side effects. They run in the worker process alongside the engine's built-in workflows. You define them in src/workflows/, export them from src/workflows/index.ts, and the scaffold passes them to createWorker.
The option is extraWorkflows — NOT workflows. It is additive. The engine registers its own built-ins (send-email, import-contacts, check-alerts, and the bucket tasks) automatically — never list a built-in in extraWorkflows.
Defining a task
Import the shared hatchet client from @hogsend/engine. There are two flavors:
hatchet.task({ name, fn })— a plain task you trigger explicitly (one-off jobs, backfills, anything kicked off from the Hatchet dashboard or viahatchet.events).hatchet.durableTask({ name, onEvents, fn })— long-running / event-driven work (this is what journeys use under the hood). DeclareonEvents: [eventName]to have Hatchet route ingested events to the task automatically.
import { hatchet } from "@hogsend/engine";
export const reindexSearchTask = hatchet.task({
name: "reindex-search",
retries: 2,
executionTimeout: "30m",
// input AND return value must serialize to JSON
fn: async (input: { since: string; dryRun: boolean }) => {
// ...do the work...
return { reindexed: 0, skipped: 0, dryRun: input.dryRun };
},
});JSON-serializable IO is a hard requirement. A task's input and return value must serialize to JSON. Use specific, named keys ({ jobId: string; format: string }) or JsonValue-compatible types. Do not use a [key: string]: unknown index signature on the input type. Return a plain object — or void/undefined if there's nothing to report.
For event-driven work, use a durable task with onEvents. The input arrives as the ingested event payload (userId / userEmail / scalar properties):
import { hatchet } from "@hogsend/engine";
export const onSignupAuditTask = hatchet.durableTask({
name: "on-signup-audit",
onEvents: ["user.signed_up"],
executionTimeout: "10m",
retries: 1,
fn: async (input: {
userId: string;
userEmail: string;
properties: Record<string, string | number | boolean | null>;
}) => {
// side effect; return a JSON-safe summary (or nothing)
return { audited: input.userId };
},
});For normal lifecycle messaging, prefer a journey over a raw durable task — journeys give you enrollment guards, state tracking, durable sleeps, and exit conditions for free. Reach for a custom durable task only when you need orchestration the journey system doesn't model.
Accessing the database inside a task
Tasks run in the worker and are constructed at module load, so they don't receive the request container. Open a connection inside fn with createDatabase from @hogsend/db and always close it in a finally:
import { createDatabase } from "@hogsend/db";
import { createLogger, hatchet } from "@hogsend/engine";
export const nightlyCleanupTask = hatchet.task({
name: "nightly-cleanup",
fn: async () => {
const { db, client } = createDatabase({ url: process.env.DATABASE_URL ?? "" });
const logger = createLogger(process.env.LOG_LEVEL ?? "info");
try {
// ...work with db...
return { ok: true };
} finally {
await client.end({ timeout: 5 });
}
},
});Register a task
Two edits, both in files you own.
1. Export it from src/workflows/index.ts — list only your tasks; the engine adds its built-ins itself:
import { nightlyCleanupTask } from "./nightly-cleanup.js";
import { reindexSearchTask } from "./reindex-search.js";
export const extraWorkflows = [nightlyCleanupTask, reindexSearchTask];2. Confirm src/worker.ts passes it — the key detail is the option name:
const worker = createWorker({
container: client,
journeys,
buckets,
extraWorkflows, // NOT `workflows`
});createWorker builds the worker as [...engine built-ins, ...journeyTasks, ...bucketTasks, ...extraWorkflows]. After editing, restart the worker (hatchet worker dev or pnpm worker:dev) so the new task registers.
Idempotent backfills (expand → migrate → contract)
When a release adds a column that needs populating on existing rows, do not put the data change inside a schema migration — that holds locks and runs unbounded against a live database. Drive it from a Hatchet task using runBatchedBackfill (from @hogsend/engine), which runs the migration in small, idempotent, lock-friendly batches that are resumable: if the process dies, re-running continues where it left off because each batch only selects rows that still need work.
Sequence the change across releases so old and new code can run side by side:
- Release N (expand) — the migration adds the column (nullable / defaulted); code writes both the old and new shape. Deploy.
- Run the backfill task once (Hatchet dashboard, or push its event) to populate existing rows. It's batched, idempotent, and resumable.
- Release N+1 — code reads the new column.
- Release N+2 (contract) — once the backfill is confirmed complete, a migration drops the old column / adds
NOT NULL.
The two rules that make runBatchedBackfill safe: runBatch must be idempotent and self-bounding (touch only rows that still need the change, e.g. WHERE new_col IS NULL ... LIMIT n, and return the rows affected; return 0 to signal completion), and select the batch with FOR UPDATE SKIP LOCKED so concurrent runs don't fight over the same rows.
import { createDatabase } from "@hogsend/db";
import { createLogger, hatchet, runBatchedBackfill } from "@hogsend/engine";
import { sql } from "drizzle-orm";
export const backfillExampleTask = hatchet.task({
name: "backfill-example",
retries: 2,
executionTimeout: "30m",
fn: async () => {
const { db, client } = createDatabase({ url: process.env.DATABASE_URL ?? "" });
const logger = createLogger(process.env.LOG_LEVEL ?? "info");
try {
const result = await runBatchedBackfill({
db,
logger,
label: "contacts.normalized_email",
batchSize: 500, // keep modest so each statement holds locks briefly
pauseMs: 50, // pause between batches to relieve a live DB
runBatch: async (database, limit) => {
// Bounded + idempotent: only rows that still need it, locked with
// SKIP LOCKED so concurrent runs/workers don't collide.
const updated = (await database.execute(sql`
WITH batch AS (
SELECT id FROM contacts
WHERE normalized_email IS NULL
LIMIT ${limit}
FOR UPDATE SKIP LOCKED
)
UPDATE contacts c
SET normalized_email = lower(trim(c.email))
FROM batch
WHERE c.id = batch.id
RETURNING c.id
`)) as unknown as unknown[];
return updated.length; // 0 = done (exhausted)
},
});
// Plain JSON object so Hatchet can serialize the task output.
return {
batches: result.batches,
rows: result.rows,
exhausted: result.exhausted, // true only when a batch returned 0
};
} finally {
await client.end({ timeout: 5 });
}
},
});Register it like any other custom task (extraWorkflows), restart the worker, then trigger the run once from the Hatchet dashboard. Re-running is safe and resumes where it left off. Only after exhausted: true is confirmed should you ship the contract migration. For schema migration mechanics, see the database tooling in your scaffold's packages/db.
See also
- Events & Ingestion — the pipeline a webhook source's
IngestEventfeeds into, and how PostHog connects. - Conditions —
trigger.where/exitOnrules that decide whether an event enrolls or exits a journey. - Integrations — the Clerk / Supabase / Stripe / Segment presets and their event mappings.
- Identity — the
eventPropertiesvscontactPropertiessplit. - Destinations and Outbound webhooks — the outbound half (fanning Hogsend's event stream out with
defineDestination()), distinct from inbound sources.
Events & Ingestion
Your PostHog events flow into Hogsend and trigger journeys automatically. Stripe, custom webhooks, and the REST API work too.
Lifecycle emails through a swappable provider (Resend by default) — React Email templates, bounce tracking, unsubscribe management, and deliverability monitoring.