Skip to content

Sync workflow orchestration: lifecycle, locks, pagination, content hashing, retry, batch processing

License

Notifications You must be signed in to change notification settings

systemoperators/sync

Repository files navigation

@systemoperator/sync

Sync workflow orchestration: lifecycle, locks, pagination, content hashing, retry, batch processing. Zero dependencies, works everywhere (Cloudflare Workers, Node, Deno, Bun).

install

npm install @systemoperator/sync

usage

lifecycle

wraps full sync lifecycle: create run, acquire lock, execute, complete/fail, release lock, notify.

import { withSyncRun } from '@systemoperator/sync';

const result = await withSyncRun(
  {
    step,                    // Cloudflare WorkflowStep
    tracker: mySyncTracker,  // implements SyncRunTracker
    config: { runType: 'sync_stripe', caller: 'stripe-sync' },
    payload: { connectionId: 'conn-1', trigger: 'cron' },
    lockStore: myLockStore,  // optional, implements SyncLockStore
    notifier: myNotifier,    // optional, implements SyncNotifier
  },
  async (ctx) => {
    // ctx has: runId, counts, checkTimeout(), payload, config
    await paginateCursor(step, ctx, { ... });
  },
);

pagination

durable cursor and offset pagination with timeout checking.

import { paginateCursor, paginateOffset } from '@systemoperator/sync';

await paginateCursor(step, ctx, {
  stepPrefix: 'charges',
  fetchPage: async (cursor) => stripe.charges.list({ starting_after: cursor }),
  processPage: async (items) => processCharges(items),
  getCursor: (items) => items.at(-1)!.id,
});

await paginateOffset(step, ctx, {
  stepPrefix: 'transactions',
  limit: 100,
  fetchPage: async (offset, limit) => mercury.listTransactions({ offset, limit }),
  processPage: async (items) => processTransactions(items),
});

content hashing

deterministic SHA-256 hashing with sorted keys, upsert-with-hash pattern.

import { computeContentHash, upsertWithHash, determineAction } from '@systemoperator/sync';

const hash = await computeContentHash(apiRecord);

const result = await upsertWithHash({
  newHash: hash,
  insert: () => db.insert(charges).values({ ...data, contentHash: hash }),
  findExisting: () => db.select().from(charges).where(eq(charges.externalId, id)),
  update: (existingId) => db.update(charges).set({ ...data, contentHash: hash }).where(eq(charges.id, existingId)),
});
// result: 'created' | 'updated' | 'unchanged'

retry

durable retry with step.sleep() that survives worker restarts.

import { withRetry, RETRY_PRESETS, RetryableError } from '@systemoperator/sync';

const data = await withRetry(step, 'fetch-charges', async () => {
  const res = await fetch(url);
  if (!res.ok) throw new RetryableError(`HTTP ${res.status}`);
  return res.json();
}, RETRY_PRESETS.standard);

batch processing

timeout-aware batch processing with stats tracking.

import { processBatchWithTimeout } from '@systemoperator/sync';

const allDone = await processBatchWithTimeout(step, items, {
  batchSize: 50,
  checkTimeout: ctx.checkTimeout,
  processItem: async (item) => processOne(item),
  counts: ctx.counts,
});

store interfaces

the package never touches databases directly. products inject implementations:

import type { SyncRunTracker, SyncLockStore, SyncNotifier } from '@systemoperator/sync';

const tracker: SyncRunTracker = {
  async createAndStartRun(runType, trigger, input) { /* create run record, return ID */ },
  async finishRun(runId, output) { /* mark run completed */ },
  async failRun(runId, error, output) { /* mark run failed */ },
};

const lockStore: SyncLockStore = {
  async acquireLock(resourceId, lockOwner, timeoutMs) { /* pessimistic lock */ },
  async releaseLock(resourceId) { /* release lock */ },
};

retry presets

preset maxAttempts initialDelay maxDelay backoff
fast 3 1s 5s 2x
standard 5 2s 60s 2x
rateLimit 8 5s 5min 3x
dns 4 10s 2min 2x
verification 10 30s 10min 1.5x

license

MIT

About

Sync workflow orchestration: lifecycle, locks, pagination, content hashing, retry, batch processing

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published