Simple examples to get started with asyncmcp
#!/usr/bin/env python3
import asyncio
import boto3
from mcp.server.lowlevel import Server
from mcp import types
from asyncmcp.sqs import sqs_server
from asyncmcp import SqsServerConfig
# Create MCP server
app = Server("hello-world")
@app.list_tools()
async def list_tools():
return [
types.Tool(
name="greet",
description="Say hello",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string"}
},
"required": ["name"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "greet":
return [types.TextContent(
type="text",
text=f"Hello, {arguments['name']}!"
)]
async def main():
config = SqsServerConfig(
read_queue_url="http://localhost:4566/000000000000/mcp-requests"
)
sqs_client = boto3.client(
'sqs',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
async with sqs_server(config, sqs_client) as (read, write):
await app.run(read, write, types.InitializationOptions())
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
import asyncio
import boto3
from mcp.client import ClientSession
from asyncmcp.sqs import sqs_client
from asyncmcp import SqsClientConfig
async def main():
config = SqsClientConfig(
read_queue_url="http://localhost:4566/000000000000/mcp-requests",
response_queue_url="http://localhost:4566/000000000000/mcp-responses"
)
sqs_boto_client = boto3.client(
'sqs',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
async with sqs_client(config, sqs_boto_client) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"greet",
arguments={"name": "asyncmcp"}
)
print(result.content[0].text)
# Output: Hello, asyncmcp!
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
from fastapi import FastAPI
import uvicorn
from mcp.server.lowlevel import Server
from mcp import types
from asyncmcp.webhook.manager import WebhookSessionManager
from asyncmcp import WebhookServerConfig
app = FastAPI()
mcp_server = Server("webhook-example")
@mcp_server.list_tools()
async def list_tools():
return [
types.Tool(
name="echo",
description="Echo back the input",
inputSchema={
"type": "object",
"properties": {
"message": {"type": "string"}
},
"required": ["message"]
}
)
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "echo":
return [types.TextContent(
type="text",
text=f"Echo: {arguments['message']}"
)]
config = WebhookServerConfig()
session_manager = WebhookSessionManager(mcp_server, config)
app.mount("/mcp", session_manager.asgi_app())
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
#!/usr/bin/env python3
import asyncio
from fastapi import FastAPI, Request
import uvicorn
from mcp.client import ClientSession
from asyncmcp import webhook_client, WebhookClientConfig
webhook_app = FastAPI()
@webhook_app.post("/webhook")
async def receive_webhook(request: Request):
# Handle webhook response
body = await request.json()
print(f"Received: {body}")
return {"status": "ok"}
async def run_client():
config = WebhookClientConfig(
server_url="http://localhost:8000/mcp/request",
webhook_url="http://localhost:8001/webhook"
)
async with webhook_client(config) as (read, write, client):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"echo",
arguments={"message": "Hello Webhooks!"}
)
print(result.content[0].text)
# Output: Echo: Hello Webhooks!
async def main():
# Start webhook server
webhook_config = uvicorn.Config(
webhook_app, host="0.0.0.0", port=8001, log_level="error"
)
webhook_server = uvicorn.Server(webhook_config)
webhook_task = asyncio.create_task(webhook_server.serve())
await asyncio.sleep(1)
# Run client
await run_client()
webhook_task.cancel()
if __name__ == "__main__":
asyncio.run(main())
#!/usr/bin/env python3
import asyncio
import boto3
from asyncmcp.proxy import create_proxy_server
from asyncmcp.sqs.utils import SqsClientConfig
async def main():
# Configure SQS backend
backend_config = SqsClientConfig(
read_queue_url="http://localhost:4566/000000000000/mcp-requests",
response_queue_url="http://localhost:4566/000000000000/mcp-responses"
)
sqs_client = boto3.client(
'sqs',
endpoint_url='http://localhost:4566',
region_name='us-east-1',
aws_access_key_id='test',
aws_secret_access_key='test'
)
# Create proxy server
proxy = create_proxy_server(
backend_transport="sqs",
backend_config=backend_config,
backend_clients={"sqs_client": sqs_client},
port=8080
)
print("Proxy running on http://localhost:8080/mcp")
await proxy.run()
if __name__ == "__main__":
asyncio.run(main())
Start LocalStack
localstack start
Setup Resources
uv run examples/setup.py
Run Server
uv run server.py
Run Client
uv run client.py