> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kavachos.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Event streaming

> Stream auth events, agent lifecycle changes, and anomaly alerts in real time via SSE using createEventStreamModule. Missed events replay from a stored cursor.

## What event streaming is

Event streaming gives you a persistent, real-time connection to KavachOS events via [Server-Sent Events (SSE)](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events). The moment an agent is revoked, a budget is exceeded, or an anomaly is detected, connected clients receive the event, no polling required.

Events are also persisted to the database so you can replay anything you missed.

## Streaming vs webhooks

Both webhooks and SSE carry the same events, but they serve different use cases.

|               | Webhooks                             | Event streaming                                       |
| ------------- | ------------------------------------ | ----------------------------------------------------- |
| Transport     | HTTP POST to your server             | Persistent HTTP connection to your browser or service |
| Latency       | \~100–500ms (network + retry)        | Near-instant                                          |
| Auth          | Verified by HMAC signature           | Bearer token                                          |
| Missed events | Retried up to 3 times, then dropped  | Replay via `since` cursor                             |
| Best for      | Backend integrations, data pipelines | Dashboards, SOC tooling, live monitoring              |

Use webhooks when you need durable delivery to an external service. Use event streaming when you need a live view, a security dashboard, an admin feed, or a CI script watching for `anomaly.detected`.

## Setup

```typescript theme={"system"}
import { createKavach } from 'kavachos';
import { createEventStreamModule } from 'kavachos/auth';

const kavach = await createKavach({
  database: { provider: 'sqlite', url: 'kavach.db' },
});

const stream = createEventStreamModule({
  db: kavach.db,
  requireAuth: true,
  validateToken: async (token) => {
    // Return the subscriber ID (userId or agentId) on success, null on failure.
    const session = await kavach.session.verify(token);
    return session?.userId ?? null;
  },
});

// Mount in your HTTP handler
app.get('/api/kavach/events/stream', (req, res) => {
  const response = stream.handleRequest(req);
  if (response) return response;
});

// Emit from anywhere in your application
stream.emit({
  id: crypto.randomUUID(),
  type: 'agent.created',
  timestamp: new Date(),
  data: { agentId: agent.id, name: agent.name },
});
```

<Note>
  `handleRequest` returns `null` for any request that is not a valid SSE request (wrong path, wrong `Accept` header, or non-GET method). This makes it safe to call inside a catch-all handler.
</Note>

## Connecting from a browser

The browser's built-in `EventSource` API handles reconnection automatically.

```typescript theme={"system"}
const source = new EventSource(
  '/api/kavach/events/stream?token=your-bearer-token',
);

source.addEventListener('agent.created', (event) => {
  const data = JSON.parse(event.data);
  console.log('New agent:', data.agentId);
});

source.addEventListener('anomaly.detected', (event) => {
  const data = JSON.parse(event.data);
  console.warn('Anomaly:', data);
});

source.addEventListener('error', (event) => {
  const data = JSON.parse((event as MessageEvent).data);
  console.error('Stream error:', data.code);
});

// Clean up
source.close();
```

If you pass the token via the `Authorization` header instead, use the `eventsource` package or a fetch-based polyfill, since the native `EventSource` does not support custom headers.

## Connecting from Node.js

```typescript theme={"system"}
import { EventSource } from 'eventsource';

const source = new EventSource('https://your-app.com/api/kavach/events/stream', {
  headers: {
    Authorization: 'Bearer your-bearer-token',
  },
});

source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Event:', data.type, data);
};

source.onerror = () => {
  console.error('Connection lost, reconnecting...');
};
```

## Event types

| Type                 | When it fires                                |
| -------------------- | -------------------------------------------- |
| `audit`              | Any agent action logged to the audit trail   |
| `agent.created`      | A new agent identity is registered           |
| `agent.revoked`      | An agent is permanently revoked              |
| `agent.rotated`      | An agent token is rotated                    |
| `auth.signin`        | A user signs in                              |
| `auth.signout`       | A user signs out                             |
| `auth.failed`        | A sign-in attempt fails                      |
| `delegation.created` | A new delegation chain is created            |
| `delegation.revoked` | A delegation is revoked                      |
| `budget.exceeded`    | An agent exceeds its cost budget             |
| `anomaly.detected`   | The anomaly detector flags unusual behaviour |
| `cost.recorded`      | A cost event is attributed to an agent       |

## Filtering events

Pass a `types` query parameter with a comma-separated list to receive only the events you care about.

```
GET /api/kavach/events/stream?types=anomaly.detected,budget.exceeded
```

```typescript theme={"system"}
// From a browser
const source = new EventSource(
  '/api/kavach/events/stream?token=tok&types=agent.created,agent.revoked',
);
```

You can also restrict the types at the module level so no client can subscribe to events outside the allowed set.

```typescript theme={"system"}
const stream = createEventStreamModule({
  db: kavach.db,
  requireAuth: true,
  validateToken,
  eventTypes: ['audit', 'anomaly.detected'], // only these types are ever delivered
});
```

## Replay and cursor

Events are persisted in the `kavach_stream_events` table. If a client disconnects and reconnects, pass `since` to receive everything it missed.

```
GET /api/kavach/events/stream?since=2026-01-15T10:00:00Z
```

The browser `EventSource` can also pass the last received event ID via the `Last-Event-ID` header, which the browser manages automatically when you set the `id:` field on SSE events. KavachOS uses the `Last-Event-ID` value as the replay cursor.

```typescript theme={"system"}
// Programmatic replay without an open connection
const result = await stream.replay(new Date('2026-01-15T10:00:00Z'), ['audit', 'auth.failed']);
if (result.success) {
  for (const event of result.data) {
    console.log(event.type, event.data);
  }
}
```

`replay` returns up to 1000 events in descending order (newest first). Apply your own pagination on top if you need to page through large windows.

## Auth requirements

By default `requireAuth: true`. Every connection must present a valid Bearer token, either in the `Authorization` header or as the `token` query parameter.

```
Authorization: Bearer <token>
GET /api/kavach/events/stream?token=<token>
```

When the token is invalid, the stream sends a single `error` event and closes.

```json theme={"system"}
{ "code": "UNAUTHORIZED", "message": "Invalid token" }
```

To disable auth for local development or internal-only deployments:

```typescript theme={"system"}
const stream = createEventStreamModule({
  db: kavach.db,
  requireAuth: false,
});
```

<Warning>
  Never disable auth in production. The stream exposes audit events and agent lifecycle data.
</Warning>

## Configuration

```typescript theme={"system"}
interface EventStreamConfig {
  db: Database;

  // Maximum concurrent connections. Returns 503 when exceeded (default: 100).
  maxConnections?: number;

  // Interval between heartbeat comments in ms (default: 30000)
  heartbeatIntervalMs?: number;

  // Module-level event type allow-list. Clients cannot exceed this (default: all).
  eventTypes?: EventType[];

  // Require a Bearer token to connect (default: true)
  requireAuth?: boolean;

  // Validate the token and return subscriber ID (userId or agentId), or null
  validateToken?: (token: string) => Promise<string | null>;
}
```

## Connection limits

The stream rejects connections beyond `maxConnections` with a `503 Too many connections` response. Size this based on your deployment: a single process can comfortably handle hundreds of concurrent SSE connections; above that, consider a pub/sub layer (Redis, NATS) in front of the module.

## Heartbeat

The server sends `: heartbeat` comments on the configured interval (default 30 seconds) to keep load balancers and proxies from closing idle connections. No action is required on the client side, `EventSource` ignores comment lines.

## Emitting events from plugins

Any part of your application can emit to the stream.

```typescript theme={"system"}
// After revoking an agent
await kavach.agent.revoke(agentId);

stream.emit({
  id: crypto.randomUUID(),
  type: 'agent.revoked',
  timestamp: new Date(),
  data: { agentId, reason: 'manual-revocation' },
  agentId,
  userId: currentUser.id,
});
```

Integrate with the [webhooks](/webhooks) module to fire both a webhook and a stream event from the same action.

## Module API

```typescript theme={"system"}
interface EventStreamModule {
  // Emit an event to all connected clients and persist for replay
  emit(event: StreamEvent): void;

  // Handle an SSE connection request. Returns a Response or null.
  handleRequest(request: Request): Response | null;

  // Number of currently active connections
  getConnectionCount(): number;

  // Replay persisted events since a date, optionally filtered by type
  replay(since: Date, types?: EventType[]): Promise<Result<StreamEvent[]>>;

  // Close all connections and stop the heartbeat timer
  close(): void;
}
```
