Skip to main content

Cyberstrike is now open source! AI-powered penetration testing for security professionals. Star on GitHub

Creating MCP Servers

Create custom MCP servers to extend Cyberstrike with specialized tools and integrations. This is useful when you need capabilities beyond what Bolt provides.

🎞️ MARP SLIDE: custom-server-architecture.md

Building custom MCP servers for Cyberstrike

📊 DIAGRAM: mcp-server-architecture.mmd

MCP server internal architecture

Overview

MCP servers:

  • Expose tools to Cyberstrike
  • Handle tool execution
  • Manage resources
  • Support multiple transports
  • Enable custom integrations

Quick Start

TypeScript Server

src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server({
name: "my-security-tools",
version: "1.0.0"
}, {
capabilities: {
tools: {}
}
});
// Define tools
server.setRequestHandler("tools/list", async () => ({
tools: [{
name: "port-scan",
description: "Scan ports on a target host",
inputSchema: {
type: "object",
properties: {
host: { type: "string", description: "Target hostname or IP" },
ports: { type: "string", description: "Port range (e.g., 1-1000)" }
},
required: ["host"]
}
}]
}));
// Handle tool calls
server.setRequestHandler("tools/call", async (request) => {
const { name, arguments: args } = request.params;
if (name === "port-scan") {
const result = await scanPorts(args.host, args.ports);
return { content: [{ type: "text", text: result }] };
}
throw new Error(`Unknown tool: ${name}`);
});
// Start server
const transport = new StdioServerTransport();
await server.connect(transport);

Python Server

server.py
import asyncio
from mcp.server import Server
from mcp.server.stdio import stdio_server
server = Server("my-security-tools")
@server.list_tools()
async def list_tools():
return [{
"name": "port-scan",
"description": "Scan ports on a target host",
"inputSchema": {
"type": "object",
"properties": {
"host": {"type": "string", "description": "Target hostname"},
"ports": {"type": "string", "description": "Port range"}
},
"required": ["host"]
}
}]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "port-scan":
result = await scan_ports(arguments["host"], arguments.get("ports"))
return [{"type": "text", "text": result}]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as streams:
await server.run(streams[0], streams[1])
asyncio.run(main())

Project Structure

TypeScript Project

my-mcp-server/
├── src/
│ ├── index.ts
│ ├── tools/
│ │ ├── port-scan.ts
│ │ ├── vuln-scan.ts
│ │ └── index.ts
│ └── utils/
│ └── helpers.ts
├── package.json
└── tsconfig.json

package.json

{
"name": "@myorg/mcp-security",
"version": "1.0.0",
"type": "module",
"bin": {
"mcp-security": "./dist/index.js"
},
"scripts": {
"build": "tsc",
"start": "node dist/index.js"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0"
}
}

Tool Definition

Input Schema

const tool = {
name: "sql-injection-test",
description: "Test URL parameter for SQL injection",
inputSchema: {
type: "object",
properties: {
url: {
type: "string",
description: "Target URL with parameter"
},
parameter: {
type: "string",
description: "Parameter name to test"
},
technique: {
type: "string",
enum: ["boolean", "time", "error", "union"],
description: "Injection technique"
}
},
required: ["url", "parameter"]
}
};

Tool Handler

async function handleSqlInjectionTest(args: {
url: string;
parameter: string;
technique?: string;
}): Promise<string> {
const { url, parameter, technique = "boolean" } = args;
// Implement SQL injection testing
const result = await testSqlInjection(url, parameter, technique);
return JSON.stringify({
vulnerable: result.vulnerable,
technique: technique,
evidence: result.evidence,
payloads: result.successfulPayloads
}, null, 2);
}

Resources

Expose Resources

server.setRequestHandler("resources/list", async () => ({
resources: [{
uri: "scan://results/latest",
name: "Latest Scan Results",
mimeType: "application/json"
}]
}));
server.setRequestHandler("resources/read", async (request) => {
const { uri } = request.params;
if (uri === "scan://results/latest") {
const results = await getLatestScanResults();
return {
contents: [{
uri,
mimeType: "application/json",
text: JSON.stringify(results)
}]
};
}
});

Transport Options

Stdio (Default)

import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const transport = new StdioServerTransport();
await server.connect(transport);

HTTP/SSE

import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import express from "express";
const app = express();
const transport = new SSEServerTransport("/sse", app);
app.listen(3000);
await server.connect(transport);

WebSocket

import { WebSocketServerTransport } from "@modelcontextprotocol/sdk/server/websocket.js";
const transport = new WebSocketServerTransport({ port: 8080 });
await server.connect(transport);

Error Handling

Tool Errors

server.setRequestHandler("tools/call", async (request) => {
try {
const result = await executeTool(request.params);
return { content: [{ type: "text", text: result }] };
} catch (error) {
return {
content: [{
type: "text",
text: `Error: ${error.message}`
}],
isError: true
};
}
});

Validation Errors

function validateInput(args: unknown, schema: object): void {
// Validate against JSON schema
const errors = validate(args, schema);
if (errors.length > 0) {
throw new Error(`Invalid input: ${errors.join(", ")}`);
}
}

Security Considerations

Input Validation

function sanitizeHost(host: string): string {
// Validate host format
if (!/^[\w.-]+$/.test(host)) {
throw new Error("Invalid host format");
}
return host;
}

Command Injection Prevention

import { execFile } from "child_process";
// Safe: Uses execFile with array arguments
async function safeScan(host: string): Promise<string> {
return new Promise((resolve, reject) => {
execFile("nmap", ["-sV", host], (error, stdout) => {
if (error) reject(error);
else resolve(stdout);
});
});
}
// Unsafe: Don't do this
// exec(`nmap -sV ${host}`) // Command injection vulnerability!

Rate Limiting

import { RateLimiter } from "limiter";
const limiter = new RateLimiter({
tokensPerInterval: 10,
interval: "minute"
});
async function handleToolCall(request) {
await limiter.removeTokens(1);
// Process request
}

Testing

Unit Tests

import { describe, it, expect } from "vitest";
describe("port-scan tool", () => {
it("should scan specified ports", async () => {
const result = await handlePortScan({
host: "localhost",
ports: "80,443"
});
expect(result).toContain("80");
expect(result).toContain("443");
});
});

Integration Tests

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
describe("MCP Server", () => {
it("should list tools", async () => {
const client = await createTestClient();
const tools = await client.request("tools/list", {});
expect(tools.tools).toContainEqual(
expect.objectContaining({ name: "port-scan" })
);
});
});

Distribution

npm Package

Terminal window
npm publish --access public

Docker Image

FROM node:20-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY dist ./dist
ENTRYPOINT ["node", "dist/index.js"]

Binary Distribution

Terminal window
# Using pkg
npx pkg . --targets node20-linux-x64,node20-macos-x64,node20-win-x64

Best Practices

  1. Clear descriptions - Document tools thoroughly
  2. Input validation - Validate all inputs
  3. Error handling - Return meaningful errors
  4. Logging - Log operations for debugging
  5. Testing - Test all tools thoroughly
  6. Security - Prevent injection attacks

Tip

Start with stdio transport for simplicity, then add HTTP/SSE for remote access.