Skip to content

Agent Server

The AgentServer class is a powerful framework for building authenticated APIs with automatic OpenAPI documentation. It provides blockchain-based authentication, capability permissions, and seamless integration with Torus.

Overview

The AgentServer uses Hono for HTTP routing and Zod for input validation, making it easy to build type-safe APIs with automatic documentation generation.

Key Features

  • JWT Authentication: SR25519 signature-based authentication
  • Namespace Permissions: Blockchain-based access control
  • OpenAPI Documentation: Automatic API documentation generation
  • Type Safety: Full TypeScript support with Zod schemas
  • Blockchain Integration: Direct connection to Torus Network

Installation

Terminal window
npm install @torus-network/torus-ts-sdk

Basic Usage

import { AgentServer } from "@torus-network/torus-ts-sdk";
import { z } from "zod";
const agent = new AgentServer({
agentKey: "5FgfC2DY4yreEWEughz46RZYQ8oBhHVqD9fVq6gV89E6z4Ea", // Your agent's SS58 address
port: 3000,
docs: {
info: {
title: "Alice Memory Agent",
version: "1.0.0",
},
},
});
// Define a simple endpoint
agent.method(
"hello",
{
input: z.object({
name: z.string().min(1).max(50),
}),
output: {
ok: {
description: "Greeting response",
schema: z.object({
message: z.string(),
timestamp: z.number(),
}),
},
err: {
description: "Error response",
schema: z.object({
error: z.string(),
}),
},
},
},
async (input, context) => {
return {
ok: {
message: `Hello ${input.name}!`,
timestamp: Date.now(),
},
};
}
);
// Start the server
agent.run();

Configuration

AgentOptions

interface AgentOptions {
/** The SS58 address key of this agent */
agentKey: string;
/** Port number for the server */
port?: number; // Default: 3000
/** Authentication configuration */
auth?: {
headerName?: string; // Default: 'Authorization'
onAfterAuth?: (user: User) => Promise<void> | void;
jwtMaxAge?: number; // Default: 3600 (1 hour)
};
/** Documentation configuration */
docs: {
enabled?: boolean; // Default: true
path?: string; // Default: '/docs'
info: {
title: string;
version: string;
};
};
}

Authentication

JWT Authentication

The AgentServer uses JWT tokens with SR25519 signatures for authentication. Clients must provide a valid JWT token in the Authorization header.

// Method requiring authentication
agent.method(
"protected-endpoint",
{
auth: { required: true },
input: z.object({
data: z.string(),
}),
output: {
ok: {
description: "Success response",
schema: z.object({ result: z.string() }),
},
err: {
description: "Error response",
schema: z.object({ error: z.string() }),
},
},
},
async (input, context) => {
// context.user is available when auth is required
const userAddress = context.user!.walletAddress;
return {
ok: {
result: `Processing data for ${userAddress}`,
},
};
}
);

Custom Authentication Handler

const agent = new AgentServer({
agentKey: "5FgfC2DY4yreEWEughz46RZYQ8oBhHVqD9fVq6gV89E6z4Ea",
auth: {
jwtMaxAge: 7200, // 2 hours
onAfterAuth: async (user) => {
console.log(`User ${user.walletAddress} authenticated`);
// Perform additional authentication logic
},
},
docs: {
info: {
title: "My Agent API",
version: "1.0.0",
},
},
});

Namespace Permissions

The AgentServer integrates with Torus Network’s capability permission system for fine-grained access control.

Basic Namespace Protection

agent.method(
"memory-write",
{
auth: { required: true },
namespace: {
enabled: true, // Enable namespace checking
path: "agent.alice.memory.twitter", // Custom namespace path
},
input: z.object({
content: z.string(),
}),
output: {
ok: {
description: "Memory stored successfully",
schema: z.object({ id: z.string() }),
},
err: {
description: "Error storing memory",
schema: z.object({ error: z.string() }),
},
},
},
async (input, context) => {
// Only users with permission for agent.alice.memory.twitter can access
return {
ok: {
id: "memory-123",
},
};
}
);

Automatic Path Generation

If no custom path is specified, the system checks paths using the format: agent.<agent_name>.<endpoint_name>.<http_method>

agent.method("store-data", {
auth: { required: true },
namespace: { enabled: true }, // Uses: agent.alice.store-data.post
// ... rest of configuration
});

Namespace Configuration Options

interface NamespaceOptions {
/** Whether to enable capability permission checking */
enabled?: boolean; // Default: true
/** Custom namespace path */
path?: string;
/** RPC endpoints for permission verification */
rpcUrls?: string[]; // Default: ['wss://api.torus.network']
}

Method Definition

HTTP Methods

// POST /items - Create new item
agent.method("items", {
method: "post", // Default
input: z.object({
name: z.string(),
value: z.number()
}),
// ... rest of configuration
});

Input Validation

JSON Input

agent.method("process-data", {
input: z.object({
text: z.string().min(1).max(1000),
type: z.enum(["analysis", "summary", "translation"]),
options: z
.object({
language: z.string().optional(),
format: z.enum(["json", "text"]).default("json"),
})
.optional(),
}),
// ... rest of configuration
});

File Upload (Multipart Form Data)

agent.method("upload-file", {
input: {
schema: z.object({
file: z.string(), // File content
filename: z.string(),
contentType: z.string(),
}),
multipartFormData: true,
},
// ... rest of configuration
});

Output Schemas

Define both success and error response schemas:

agent.method(
"analyze-text",
{
input: z.object({
text: z.string(),
}),
output: {
ok: {
description: "Analysis completed successfully",
schema: z.object({
sentiment: z.enum(["positive", "negative", "neutral"]),
confidence: z.number().min(0).max(1),
keywords: z.array(z.string()),
wordCount: z.number(),
}),
},
err: {
description: "Analysis failed",
schema: z.object({
error: z.string(),
code: z.enum(["INVALID_INPUT", "PROCESSING_ERROR"]),
retryAfter: z.number().optional(),
}),
},
},
},
async (input, context) => {
try {
// Process the text
const result = await analyzeText(input.text);
return {
ok: {
sentiment: result.sentiment,
confidence: result.confidence,
keywords: result.keywords,
wordCount: input.text.split(" ").length,
},
};
} catch (error) {
return {
err: {
error: "Failed to analyze text",
code: "PROCESSING_ERROR",
},
};
}
}
);

OpenAPI Documentation

The AgentServer automatically generates OpenAPI documentation accessible at /docs (or custom path):

const agent = new AgentServer({
agentKey: "5FgfC2DY4yreEWEughz46RZYQ8oBhHVqD9fVq6gV89E6z4Ea",
docs: {
enabled: true,
path: "/api-docs", // Custom documentation path
info: {
title: "My Agent API",
version: "2.1.0",
},
},
});

Visit http://localhost:3000/docs to view the interactive API documentation.

Advanced Examples

Multi-Method Agent

Multi method agent is an agent that defines more than 1 method.

import { AgentServer } from "@torus-network/torus-ts-sdk";
import { z } from "zod";
const memoryAgent = new AgentServer({
agentKey: "5FgfC2DY4yreEWEughz46RZYQ8oBhHVqD9fVq6gV89E6z4Ea",
port: 3000,
docs: {
info: {
title: "Alice Memory Agent",
version: "1.0.0",
},
},
});
// Store memory
memoryAgent.method(
"store",
{
auth: { required: true },
namespace: {
enabled: true,
path: "agent.alice.memory.store",
},
input: z.object({
content: z.string().min(1).max(10000),
tags: z.array(z.string()).optional(),
metadata: z.record(z.any()).optional(),
}),
output: {
ok: {
description: "Memory stored successfully",
schema: z.object({
id: z.string(),
timestamp: z.number(),
size: z.number(),
}),
},
err: {
description: "Failed to store memory",
schema: z.object({
error: z.string(),
code: z.string(),
}),
},
},
},
async (input, context) => {
const memoryId = generateUUID();
// Store memory logic here
await storeMemory(memoryId, input.content, input.tags, input.metadata);
return {
ok: {
id: memoryId,
timestamp: Date.now(),
size: input.content.length,
},
};
}
);
// Retrieve memory
memoryAgent.method(
"retrieve",
{
method: "get",
auth: { required: true },
namespace: {
enabled: true,
path: "agent.alice.memory.retrieve",
},
input: z.object({
id: z.string().uuid(),
}),
output: {
ok: {
description: "Memory retrieved successfully",
schema: z.object({
id: z.string(),
content: z.string(),
tags: z.array(z.string()),
metadata: z.record(z.any()),
timestamp: z.number(),
}),
},
err: {
description: "Failed to retrieve memory",
schema: z.object({
error: z.string(),
code: z.string(),
}),
},
},
},
async (input, context) => {
const memory = await getMemory(input.id);
if (!memory) {
return {
err: {
error: "Memory not found",
code: "NOT_FOUND",
},
};
}
return {
ok: memory,
};
}
);
// Search memories
memoryAgent.method(
"search",
{
method: "post",
auth: { required: true },
namespace: {
enabled: true,
path: "agent.alice.memory.search",
},
input: z.object({
query: z.string().min(1).max(500),
tags: z.array(z.string()).optional(),
limit: z.number().min(1).max(100).default(10),
}),
output: {
ok: {
description: "Search results",
schema: z.object({
results: z.array(
z.object({
id: z.string(),
content: z.string(),
score: z.number(),
timestamp: z.number(),
})
),
total: z.number(),
query: z.string(),
}),
},
err: {
description: "Search failed",
schema: z.object({
error: z.string(),
code: z.string(),
}),
},
},
},
async (input, context) => {
const results = await searchMemories(input.query, input.tags, input.limit);
return {
ok: {
results: results.memories,
total: results.total,
query: input.query,
},
};
}
);
memoryAgent.run();

Other Languages

If you’re building an Agent Server in a language other than TypeScript, you’ll need to implement the following protocol specifications:

JWT Authentication Protocol

JWT Header Structure

{
"alg": "SR25519",
"typ": "JWT"
}

JWT Payload Structure

Every protected endpoint expects a payload.

{
"sub": "5FgfC2DY4yreEWEughz46RZYQ8oBhHVqD9fVq6gV89E6z4Ea",
"publicKey": "0x1234567890abcdef...",
"keyType": "sr25519",
"addressInfo": {
"addressType": "ss58",
"metadata": {
"prefix": 42
}
},
"iat": 1640995200,
"exp": 1640998800,
"nonce": "550e8400-e29b-41d4-a716-446655440000",
"_protocol_metadata": {
"version": "1.0.0"
}
}

Field Descriptions:

  • sub: The actual address/identifier
  • publicKey: Hex-encoded public key (64 characters + 0x prefix)
  • keyType: Cryptographic key type (sr25519)
  • addressInfo: Address format information
    • addressType: Address encoding type (ss58)
    • metadata: Additional address parameters (e.g., prefix: 42)
  • iat: Issued at timestamp (Unix seconds)
  • exp: Expiration timestamp (Unix seconds)
  • nonce: Unique UUID to prevent replay attacks

UUID Format:

xxxxxxxx-xxxx-Mxxx-Nxxx-xxxxxxxxxxxx

Where:

  • x is a hexadecimal digit (0-9, a-f)
  • M indicates the UUID version
  • N indicates the variant

Follows RFC 4122 standard for UUID generation and validation.

  • _protocol_metadata.version: Protocol version for compatibility

JWT Verification Process

  1. Parse JWT: Split token by . to get header, payload, signature
  2. Verify Structure: Ensure header has alg: "SR25519" and typ: "JWT"
  3. Validate Payload: Check required fields and expiration
  4. Verify Signature: Use SR25519 to verify signature against header.payload
  5. Check Protocol Version: Ensure compatibility with _protocol_metadata.version
import json
import base64
from datetime import datetime
from sr25519 import verify # Use appropriate SR25519 library
def verify_jwt(token: str) -> dict:
try: # Split JWT into parts
parts = token.split('.')
if len(parts) != 3:
raise ValueError("Invalid JWT format")
header, payload, signature = parts
# Decode header and payload
header_data = json.loads(base64.urlsafe_b64decode(header + '=='))
payload_data = json.loads(base64.urlsafe_b64decode(payload + '=='))
# Verify header
if header_data.get('alg') != 'SR25519' or header_data.get('typ') != 'JWT':
raise ValueError("Invalid JWT header")
# Verify expiration
if payload_data.get('exp', 0) < datetime.now().timestamp():
raise ValueError("JWT expired")
# Verify signature
signing_input = f"{header}.{payload}"
public_key = bytes.fromhex(payload_data['publicKey'][2:]) # Remove 0x prefix
signature_bytes = base64.urlsafe_b64decode(signature + '==')
if not verify(signing_input.encode(), signature_bytes, public_key):
raise ValueError("Invalid signature")
return payload_data
except Exception as e:
raise ValueError(f"JWT verification failed: {str(e)}")

Request Authentication

Authorization Header Format:

Authorization: Bearer <JWT_TOKEN>

Namespace Permission Checking

After JWT verification, check capability permissions:

  1. Verificate Namespace Path: agent.<agent_name>.<endpoint_name>.<http_method>
  2. Query Blockchain: Check if user has permission for the namespace path
  3. Validate Permission: Ensure permission is active and not expired

Response Format

Success Response (200):

{
"field1": "value1",
"field2": "value2"
}

Error Responses:

  • 400 Bad Request: Invalid input or business logic error
  • 401 Unauthorized: Missing or invalid JWT token
  • 403 Forbidden: Insufficient capability permissions
{
"message": "Error description",
"code": "ERROR_CODE"
}

OpenAPI Specification

Your server should generate OpenAPI specs with:

Security Scheme:

components:
securitySchemes:
Bearer:
type: http
scheme: bearer
bearerFormat: JWT

Authentication Required Endpoints:

security:
- Bearer: []

Best Practices

Security

  • Always validate input using Zod schemas
  • Use capability permissions for sensitive operations
  • Set appropriate JWT expiration times

Performance

  • Cache blockchain queries when possible
  • Use appropriate timeout values for RPC calls
  • Implement pagination for large result sets
  • Consider connection pooling for database operations

Documentation

  • Provide clear descriptions for all endpoints
  • Use descriptive error codes and messages
  • Include examples in your API documentation
  • Document capability permission requirements