Hogsend is brand new.Chat to Doug
Hogsend
Recipes

Weekly digest

A weekly activity digest as a cron Hatchet task — onCrons scheduling, one aggregate query over user_events, per-user idempotency keys, preference-checked sends, and why this is a task in src/workflows/, not a journey.

A digest is a scheduled fan-out, not a behavioral flow — so it is not a journey. It's a custom Hatchet task in your src/workflows/, registered via createWorker({ extraWorkflows }), that runs on a cron, aggregates last week's activity in one query, and sends one preference-checked, idempotent email per active user. Users with nothing to report never even appear in the result set.

Why not a journey: defineJourney() produces a task wired to onEvents: [meta.trigger.event] — a journey run is born from one user's ingested event and owns that one user's state. A digest has no triggering event (its trigger is a clock) and no per-user flow (every recipient gets the same shaped send, decided by a query at send time). Forcing it into a journey means either a cron that fires a synthetic event per user every Monday — N events and N enrollments to simulate one sweep — or a per-user run that loops "sleep until next Monday" forever, which the journey execution timeout (720h, 30 days) terminates after four digests. One cron task does the whole job.

ConcernHow you express it
Every Monday morningonCrons: ["0 9 * * 1"] on hatchet.task()
Who gets a digestone aggregate query over user_events
Skip empty digeststhe GROUP BY only returns users with activity
Never double-send on a retryidempotencyKey: "digest:<userId>:<week>"
Respect unsubscribesthe tracked mailer's preference check (no skipPreferenceCheck)
Wire it increateWorker({ …, extraWorkflows })

The task

// src/workflows/weekly-digest.ts
import { contacts, userEvents } from "@hogsend/db";
import { hatchet } from "@hogsend/engine";
import { and, eq, gte, inArray, sql } from "drizzle-orm";
import { getContainer } from "../container.js";
import { Events, Templates } from "../journeys/constants/index.js";

const WINDOW_MS = 7 * 24 * 60 * 60 * 1000;

export const weeklyDigestTask = hatchet.task({
  name: "weekly-digest",
  // Mondays at 09:00 UTC — cron expressions evaluate in UTC, not per-user time.
  onCrons: ["0 9 * * 1"],
  retries: 1,
  executionTimeout: "30m",
  fn: async () => {
    const { db, emailService, logger } = getContainer();
    const since = new Date(Date.now() - WINDOW_MS);
    // One key per (user, weekly run): retries and re-runs can't double-send.
    const weekKey = new Date().toISOString().slice(0, 10);

    // One aggregate query: per-user counts over the window. Users with no
    // qualifying events never appear — the empty digest is structurally
    // impossible, not a filter you have to remember.
    const activity = await db
      .select({
        userId: userEvents.userId,
        reportsCreated: sql<number>`count(*) filter (where ${userEvents.event} = ${Events.REPORT_CREATED})`,
        reportsShared: sql<number>`count(*) filter (where ${userEvents.event} = ${Events.REPORT_SHARED})`,
      })
      .from(userEvents)
      .where(
        and(
          gte(userEvents.occurredAt, since),
          inArray(userEvents.event, [
            Events.REPORT_CREATED,
            Events.REPORT_SHARED,
          ]),
        ),
      )
      .groupBy(userEvents.userId);

    let sent = 0;
    let skipped = 0;

    for (const row of activity) {
      // Identity is resolved server-side from the contacts row — the userId
      // on events is the external id, never an email address.
      const contact = await db.query.contacts.findFirst({
        where: eq(contacts.externalId, row.userId),
      });
      if (!contact?.email) {
        skipped++;
        continue;
      }

      const result = await emailService.send({
        template: Templates.RETENTION_WEEKLY_DIGEST, // "retention/weekly-digest"
        to: contact.email,
        userId: row.userId,
        subject: "Your week in review",
        props: {
          reportsCreated: Number(row.reportsCreated),
          reportsShared: Number(row.reportsShared),
          weekOf: weekKey,
        },
        // NO skipPreferenceCheck — a digest is exactly the mail that
        // preferences exist to control. Unsubscribed/suppressed recipients
        // come back as a status, not an error.
        idempotencyKey: `digest:${row.userId}:${weekKey}`,
      });

      if (result.status === "sent") sent++;
      else skipped++;
    }

    logger.info("weekly-digest complete", { sent, skipped, weekKey });
    return { sent, skipped, week: weekKey };
  },
});

The task resolves its services from the process-wide container — the same pattern Hogsend's production lead-alert task uses — and sends through emailService.send rather than sendEmail(): the journey-side wrapper hardcodes the journey category and takes no idempotencyKey, while the container service gives the digest both the registry's category and the retry-safe key.

Idempotency is the whole reliability story

A digest sender's classic failure is the half-completed run: the process dies at recipient 4,000 of 9,000, the retry starts from the top, and the first 4,000 people get two digests. Here retries: 1 re-runs the same fn, the loop re-issues the same sends — and every send carries idempotencyKey: "digest:<userId>:<weekKey>", so a key the mailer has already fulfilled short-circuits to the existing email_sends row instead of delivering again. The retry finishes the back half of the list and no-ops the front half. The same property makes a manual re-run (replaying the task from the Hatchet dashboard after a deploy) safe on the same day.

Skips are data, not errors

Three populations fall out of a digest run without special handling:

  • No activity — never selected: the aggregate's GROUP BY only returns users with at least one qualifying event in the window. The "empty digest" email cannot be constructed, because its recipient is never in the loop.
  • No email on record — an events-only identity (a userId with no contact email) is counted in skipped and passed over.
  • Unsubscribed or suppressed — the tracked mailer checks preferences on every send and returns status: "unsubscribed" / "suppressed" rather than delivering. The task counts them; it never works around them.

The returned { sent, skipped, week } summary lands in the Hatchet run result, so week-over-week delivery counts are visible without a dashboard.

Register it

The wiring is two files you own, plus the process-wide container helper the task imports:

// src/workflows/index.ts — list only YOUR tasks; built-ins register themselves
import { weeklyDigestTask } from "./weekly-digest.js";

export const extraWorkflows = [weeklyDigestTask];
// src/worker.ts
import { createWorker } from "@hogsend/engine";
import { getContainer } from "./container.js";
import { journeys } from "./journeys/index.js";
import { extraWorkflows } from "./workflows/index.js";

const client = getContainer();
const worker = createWorker({
  container: client,
  journeys,
  extraWorkflows, // NOT `workflows`
});
await worker.start();
// src/container.ts — lazy so importing a workflow module stays side-effect free
import { createHogsendClient, type HogsendClient } from "@hogsend/engine";
import { templates } from "./emails/registry.js";
import { journeys } from "./journeys/index.js";

let client: HogsendClient | undefined;

export function getContainer(): HogsendClient {
  client ??= createHogsendClient({ journeys, email: { templates } });
  return client;
}

The retention/weekly-digest key needs a React Email component plus a registry.ts entry and a templates.d.ts augmentation (Email guide); after that, the props bag above is type-checked. Restart the worker (hatchet worker dev) so the new task registers.

  • The cron is UTC. 0 9 * * 1 is Monday 09:00 UTC for everyone. Per-recipient local timing is a journey concern (ctx.when, see Timezone-aware scheduling) — a digest deliberately trades it for one sweep.
  • Never pass skipPreferenceCheck here. That flag exists for transactional mail like password resets; a digest is the canonical email a preference center exists to switch off.
  • The idempotency key encodes the period. digest:<userId>:<weekKey> makes retries and same-day re-runs no-ops while still allowing next Monday's send — drop the date and the user only ever receives one digest.
  • Reach for a journey first. For normal lifecycle messaging, journeys give you enrollment guards, durable sleeps, and exit conditions for free — a custom task is for orchestration the journey model doesn't fit, and a population-wide cron fan-out is the textbook case.

Related: Marketing campaigns is the broadcast alternative when every recipient gets identical content (one hs.campaigns.send to a list, no per-user query), AI-drafted sends builds on the same custom-task shape, and Win-back and sunset handles the users a digest stops reaching. Custom-task mechanics are documented in the Webhook sources guide.

On this page