Knowledge Bank Documentation
Build intelligent applications with the GuruCloud Knowledge Bank. Semantic search, multi-dimensional schemas, batch ingestion, and MCP server integration — all through a clean REST API and Python SDK.
https://www.gurucloudai.com/api/v1/kb
• All responses use a {"data": ...} envelope.
Authentication
All API requests require a KB API key passed as a Bearer token.
API keys start with kb_ and can be created from the
KB Dashboard → API Keys section.
Authorization: Bearer kb_your_api_key_here
Scopes
API keys can have one or more scopes that control access:
| Scope | Access |
|---|---|
| read | Search, list entries, get schema, get stats, MCP definition |
| write | Add/update/delete entries, batch ingest |
| admin | Modify schema, delete KB, manage dimensions, generate PATs |
MCP Authentication
MCP endpoints accept both KB API keys (kb_...) and
Personal Access Tokens (PATs) as Bearer tokens. You can use your existing KB API key
directly — no need to generate a separate token. If you prefer a dedicated MCP-only
token, use the generate-pat endpoint.
Installation
pip install gurucloud-kb
Requires Python 3.10+. The only dependency is httpx.
Quick Start
from gurucloud_kb import GuruCloudClient
client = GuruCloudClient(api_key="kb_your_api_key")
# List your Knowledge Banks
kbs = client.list_kbs()
# Work with a specific KB
kb = client.get_kb("your-kb-uuid")
# Search
results = kb.search("how does authentication work?")
# Add an entry
kb.add_entry({
"dimensions": {
"content": "Auth uses JWT tokens with RS256 signing.",
"useful_for": "Understanding auth architecture",
}
})
# Batch ingest
kb.ingest([
{"dimensions": {"content": "Entry 1"}},
{"dimensions": {"content": "Entry 2"}},
])
# Get MCP server definition for agent injection
mcp_def = kb.get_mcp_server_definition()
from gurucloud_kb import AsyncGuruCloudClient
async with AsyncGuruCloudClient(api_key="kb_your_api_key") as client:
# All methods are awaitable
kb = await client.get_kb("your-kb-uuid")
results = await kb.search("how does auth work?")
await kb.ingest([
{"dimensions": {"content": "Entry 1"}},
{"dimensions": {"content": "Entry 2"}},
])
mcp_def = await kb.get_mcp_server_definition()
Sync Client
GuruCloudClient
The main entry point. Supports context manager for automatic cleanup.
| Method | Returns | Description |
|---|---|---|
| list_kbs() | list[KBInfo] | List all KBs owned by the user |
| get_kb(kb_id) | KnowledgeBank | Get a KB object with pre-bound methods |
| create_kb(name, ...) | KnowledgeBank | Create a new KB |
| update_kb(kb_id, ...) | KBInfo | Update name/description (also refreshes the agent initialize.instructions & server-def description) |
| delete_kb(kb_id) | dict | Delete a KB (admin scope) |
| get_mcp_server_definition(kb_id) | MCPServerDefinition | Get MCP server URL and tools |
| create_api_key(name, ...) | APIKeyInfo | Create a new API key |
| list_api_keys() | list[APIKeyInfo] | List all API keys |
| delete_api_key(key_id) | dict | Delete an API key |
Async Client
AsyncGuruCloudClient
Identical API to GuruCloudClient, but all methods are
async. Uses httpx.AsyncClient under the hood.
Supports async with for automatic cleanup.
import asyncio
from gurucloud_kb import AsyncGuruCloudClient
async def main():
async with AsyncGuruCloudClient(api_key="kb_...") as client:
kb = await client.get_kb("my-kb")
results = await kb.search("query")
print(results)
asyncio.run(main())
KnowledgeBank Object
Returned by client.get_kb() or client.create_kb().
All methods are pre-bound to the KB's UUID.
Properties
| Property | Type | Description |
|---|---|---|
| id | str | KB UUID |
| name | str | Human-readable name |
| description | str | KB description — also the agent initialize.instructions & server-def description; set via update() |
| entry_count | int | Number of entries |
| total_queries | int | Total search queries |
| info | KBInfo | Full metadata dict |
Methods
| Method | Scope | Description |
|---|---|---|
| search(query, k=10, threshold=0.5) | read | Semantic search (string or multi-dim) |
| cluster(fields=["content"], method="auto", ...) | read | Group entries by field — vector (embeddings) or fuzzy (text/metadata) |
| add_entry(entry) | write | Add a single entry |
| ingest(entries, deduplicate=True) | write | Batch ingest (max 100) |
| list_entries(limit, offset) | read | List entries with pagination |
| get_entry(entry_id) | read | Get single entry |
| update_entry(entry_id, updates) | write | Update entry dimensions |
| delete_entry(entry_id) | write | Delete an entry |
| get_schema() | read | Get dimension schema |
| update_schema(schema) | admin | Replace full schema |
| validate_schema(schema) | read | Validate without applying |
| add_dimension(dimension) | admin | Add a dimension |
| remove_dimension(name) | admin | Remove a dimension |
| get_mcp_server_definition() | read | Get MCP server URL and tools |
| generate_pat(token_name) | admin | Generate a PAT for MCP auth |
| get_mcp_config() | read | Get .mcp.json snippet |
| get_mcp_tools() | read | Get MCP tool definitions |
| get_stats() | read | Performance statistics |
| update(name=None, description=None) | write | Update name/description in place (also refreshes the agent initialize.instructions & server-def description) |
| refresh() | read | Re-fetch KB info |
Search
The search() method accepts either a simple string or a
full multi-dimensional search request.
Simple string search
results = kb.search("how does auth work?", k=5, threshold=0.7)
for r in results:
print(r["content"], r["combined_score"])
Multi-dimensional search
results = kb.search({
"dimensions": {
"content": {"query": "JWT tokens", "weight": 1.0},
"useful_for": {"query": "debugging auth", "weight": 1.5},
},
"k": 5,
"threshold": 0.6,
})
Filter by time
Restrict results to a time window with a hard filter
on entry timestamps (UTC). It removes out-of-window entries without
affecting the ranking. Bounds accept an ISO-8601 string or a
datetime; available keys are created_after,
created_before, updated_after, and
updated_before. Each result carries
created_at / updated_at.
from datetime import datetime, timedelta, timezone
# String query: only knowledge created in the last 30 days
recent = kb.search(
"deployment pipeline",
created_after=datetime.now(timezone.utc) - timedelta(days=30),
)
# Dict query: set the same keys inline
results = kb.search({
"dimensions": {"content": {"query": "deployment pipeline"}},
"created_after": "2026-05-01T00:00:00Z",
"created_before": "2026-06-01",
})
Exact filtering
metadata_filters is an exact (non-semantic) JSONB
containment filter applied on top of the
semantic ranking — only entries whose metadata
contains all the given key/values survive. It narrows
the ranked results; it does not rank on its own, so pass at least
one semantic dimension alongside it.
results = kb.search({
"dimensions": {"observation": {"query": "late delivery"}},
"metadata_filters": {"order_id": "SO-1234", "type": "quality_issue"},
})
The same filter is available to agents via the
query_knowledge_bank MCP tool
(metadata_filters={"order_id": "SO-1234"}) and to the
REST API under metadata_filters in the search body.
Schema Management
Each KB has a dimension schema that defines what search dimensions exist and how they're combined.
# Get the current schema
schema = kb.get_schema()
print(schema["dimensions"]) # list of dimension configs
# Add a new dimension
kb.add_dimension({
"name": "priority",
"dimension_type": "single",
"description": "Priority level",
"searchable": True,
})
# Validate before applying
warnings = kb.validate_schema(new_schema)
if not warnings:
kb.update_schema(new_schema)
Ingestion
Single entry
kb.add_entry({
"dimensions": {
"content": "The API uses rate limiting at 1000 req/hr per key.",
"useful_for": "Understanding API limits",
"relevant_systems": ["api", "rate-limiting"],
},
"source": "docs",
"relevant_file_paths": ["routes/api/kb_api.py"],
})
Batch ingest
result = kb.ingest(
entries=[
{"dimensions": {"content": "Entry 1", "useful_for": "..."}},
{"dimensions": {"content": "Entry 2"}},
# ... up to 100 entries per call
],
deduplicate=True, # default: skip near-duplicates
)
print(f"Ingested: {result['ingested']}, Errors: {len(result['errors'])}")
result["errors"] for details.
Events & Conflicts
The Knowledge Bank tracks every deduplication decision made during ingestion. Use events to audit how entries are being deduplicated, view merge reasoning, and inspect conflicts.
Deduplication Events
Each time an entry is ingested, the system compares it against existing entries. The result is one of five actions:
| Action | Meaning |
|---|---|
new | No duplicates found — entry added as-is |
redundant | Near-exact duplicate — entry skipped |
update | Partial overlap — existing entry updated with merged content |
conflict | Contradicting information — requires review |
error | Processing error occurred |
# List all dedup events (paginated)
events = kb.list_events(limit=50, offset=0)
print(f"Total events: {events['total']}")
print(f"Action breakdown: {events['action_counts']}")
# Filter by action type
conflicts = kb.list_events(action="conflict")
for evt in conflicts["events"]:
print(f" {evt['content_preview']} (score: {evt['max_similarity_score']})")
# Get full details of a specific event
detail = kb.get_event(conflicts["events"][0]["id"])
print(f"Reasoning: {detail['reasoning']}")
print(f"Merged content: {detail['merged_content']}")
print(f"Similar entries: {detail['similar_entries']}")
Entry Event Logs
For deeper debugging, entry event logs capture the full processing lifecycle of each entry: queuing, hash checks, deduplication search, LLM decisions, and action execution.
# List all event logs for a KB
logs = kb.list_event_logs(limit=100)
# Filter by event type
dedup_logs = kb.list_event_logs(event_type="dedup")
# Filter by specific entry
entry_logs = kb.list_event_logs(entry_id="pending-entry-uuid")
for log in entry_logs["logs"]:
status = "OK" if log["success"] else "FAIL"
print(f" [{status}] {log['event_type']}/{log['event_name']} ({log['duration_ms']}ms)")
created_at descending.
Use entry_id to reconstruct the complete processing history of a single entry.
MCP Integration
Get everything needed to inject your Knowledge Bank's MCP server into an AI agent. Returns the MCP URL, server name, and available tools. Use your KB API key directly as the Bearer token for MCP requests.
mcp_def = kb.get_mcp_server_definition()
# Returns:
# {
# "type": "http",
# "url": "https://www.gurucloudai.com/mcp/srv-uuid/mcp",
# "server_name": "my-kb",
# "available_tools": ["query_knowledge_bank", "report_learning",
# "get_kb_entry", "edit_kb_entry", "delete_kb_entry"],
# "auth": {"type": "bearer", "note": "Use your KB API key..."},
# }
# Inject into your agent's MCP config using your API key:
agent_config = {
"mcpServers": {
mcp_def["server_name"]: {
"type": mcp_def["type"],
"url": mcp_def["url"],
"headers": {
"Authorization": f"Bearer {api_key}"
}
}
}
}
# Or generate a dedicated PAT (requires admin scope):
pat_info = kb.generate_pat(token_name="My Agent")
# pat_info["token"] is a never-expiring PAT for this MCP server
Write semantics differ by surface. The SDK/REST
add_entry is synchronous — it returns the
stored entry or raises, so the call tells you it landed. The
agent-facing MCP report_learning tool
queues the write (embedding + deduplication run in the
background) and returns a pending_entry_id; the
entry becomes searchable shortly after. An optional
check_learning_status tool to confirm a queued write
durably landed is configurable per deployment (off by default —
coming soon as a standard option).
Error Handling
from gurucloud_kb import (
GuruCloudClient,
AuthenticationError,
NotFoundError,
RateLimitError,
APIError,
)
try:
kb = client.get_kb("nonexistent")
except AuthenticationError:
print("Invalid API key")
except NotFoundError:
print("KB not found")
except RateLimitError:
print("Rate limit exceeded - slow down")
except APIError as e:
print(f"API error {e.status_code}: {e.message}")
| Exception | HTTP Status | When |
|---|---|---|
| AuthenticationError | 401 | Invalid or missing API key |
| PermissionError | 403 | Insufficient scope |
| NotFoundError | 404 | Resource not found |
| RateLimitError | 429 | Rate limit exceeded |
| APIError | * | Any other API error |
| ConnectionError | — | Network/timeout error |
REST API: Knowledge Banks
List all Knowledge Banks. read
Get a specific Knowledge Bank. read
Create a new Knowledge Bank. write
{
"name": "My KB",
"description": "Optional description",
"dimension_schema": { ... } // optional
}Update KB name/description. write
The description is the canonical agent-facing text: updating it also refreshes the MCP handshake initialize.instructions and the description returned by the mcp-server-definition endpoint. Send {"description": ""} to clear it.
Delete a KB and all resources. admin
REST API: Entries
List entries with pagination. read
Add a single entry. write
{
"dimensions": {
"content": "The main knowledge content",
"useful_for": "What this is useful for"
},
"source": "optional-source-label",
"relevant_file_paths": ["path/to/file.py"]
}Batch ingest up to 100 entries. write
{
"entries": [
{"dimensions": {"content": "..."}},
{"dimensions": {"content": "..."}}
],
"deduplicate": true
}Get a single entry. read
Update an entry's dimensions. write
Delete an entry. write
REST API: Search
Multi-dimensional semantic search. read
{
"dimensions": {
"content": {"query": "search text", "weight": 1.0},
"useful_for": {"query": "context", "weight": 0.5}
},
"metadata_filters": {"status": "resolved"},
"created_after": "2026-05-01T00:00:00Z",
"created_before": "2026-06-01",
"k": 10,
"threshold": 0.5
}
Time-window filter (optional). The
created_after, created_before,
updated_after, and updated_before keys
are a hard filter on entry timestamps (UTC, ISO-8601) — they
remove out-of-window entries without affecting the ranking. A
bare date such as "2026-06-01" is treated as
00:00:00Z. Each result includes created_at
and updated_at.
REST API: Cluster
Group entries by one or more fields. read
Each field is clustered independently. A single embedding
dimension (e.g. content) clusters by
vector similarity (KMeans / Agglomerative /
HDBSCAN); any other field (metadata.customer,
source, a text_only dimension) clusters
by fuzzy string match so near-duplicate values
merge. Pass an optional search block (same shape as
Search) to cluster only the matching entries; otherwise the whole
KB is clustered up to scope_limit.
{
"fields": ["content", "metadata.customer"],
"method": "auto",
"algorithm": "auto",
"k": null,
"similarity_threshold": 0.85,
"label": false
}
Returns results keyed per field; each
FieldClusterResult carries clusters
(with size, representative ids and members), plus
silhouette_score for vector results or
key / values for fuzzy results.
REST API: Schema
Get the dimension schema. read
Replace the full schema. admin
Validate a schema without applying. read
Add a dimension. admin
Remove a dimension. admin
REST API: MCP
Get .mcp.json config snippet. read
Get MCP tool definitions. read
Get MCP server URL, name, and available tools. read
The description field is the KB's own description (the same text agents receive as initialize.instructions), falling back to the KB name when unset. Change it with PATCH /banks/{kb_id}.
Generate a Personal Access Token for this KB's MCP server. admin
Body (optional): {"token_name": "My Agent"}.
Returns token, server_url, token_name, note.
Tokens do not expire. Store securely.
Get performance statistics. read
REST API: Events
List deduplication events. read
Query params: limit (default 50, max 200), offset (default 0),
action (filter: new, redundant, update, conflict, error)
// Response
{
"events": [
{
"id": "uuid",
"source": "mcp_tools",
"content_preview": "Auth uses JWT...",
"max_similarity_score": 0.95,
"llm_invoked": true,
"action": "update",
"created_at": "2026-03-01T00:00:00"
}
],
"total": 42,
"action_counts": {"new": 30, "update": 10, "conflict": 2}
}Get full details of a specific event. read
Returns the complete event including reasoning, merged_content,
similar_entries, and execution details.
List entry processing event logs. read
Query params: limit, offset,
event_type (lifecycle, hash_check, dedup, action),
entry_id (pending entry UUID)
REST API: API Keys
Create a new API key. Returns the raw key once.
{
"name": "My Key",
"scopes": ["read", "write"],
"rate_limit_per_hour": 1000,
"expires_at": "2027-01-01T00:00:00Z" // optional
}List all API keys (masked).
Delete an API key.