Data Ingestion Guide
Complete guide to ingesting documents into the NorthBuilt RAG System.
Overview
The system supports multiple ingestion methods:
- Webhook Integration - Automatic ingestion from Fathom, HelpScout, Linear
- Manual Upload - Direct API calls to ingest custom documents
- Bulk Import - Batch processing for large document sets
Webhook Integrations
Fathom Video Transcripts
Automatically ingest video meeting transcripts from Fathom.
Setup
- Configure Webhook in Fathom:
- Log in to https://app.fathom.video
- Navigate to Settings → Integrations → Webhooks
- Click “Add Webhook”
- Webhook URL:
https://[your-api-gateway-url]/webhooks/fathom - Select events:
video.processed - Add custom header:
- Key:
x-api-key - Value:
[your-fathom-api-key]
- Key:
- Save webhook
- Verify Configuration:
```bash
Check webhook Lambda
aws lambda get-function –function-name nb-rag-sys-webhook-fathom
Test webhook endpoint
curl -X POST https://[api-url]/webhooks/fathom
-H “x-api-key: [api-key]”
-H “Content-Type: application/json”
-d ‘{
“event”: “video.processed”,
“data”: {
“video_id”: “test-123”,
“title”: “Test Video”,
“transcript”: “This is a test transcript.”
}
}’
#### What Gets Ingested
- **Video Title**: Meeting title
- **Transcript**: Full transcript with timestamps
- **Metadata**:
- Video ID
- Duration
- Participants
- Recording date
- Meeting URL
#### Processing Flow
- Fathom video completes processing
- Fathom sends webhook to API Gateway
- API Gateway routes to Fathom Webhook Lambda
- Lambda validates API key
- Lambda fetches full video details via Fathom API
- Lambda classifies content (client, project) via classify Lambda
- Lambda writes document and .metadata.json sidecar to S3
- Bedrock Knowledge Base sync triggers (scheduled)
- Knowledge Base chunks and generates embeddings with metadata
- Lambda returns 200 OK
- Video content searchable after next sync cycle ```
Note: The .metadata.json sidecar file enables multi-tenant filtering in RAG queries.
Each document has a companion metadata file (e.g., meeting.md → meeting.md.metadata.json).
Troubleshooting
Issue: Webhook not received
# Check API Gateway logs
aws logs tail /aws/apigateway/nb-rag-sys --follow
# Check Lambda logs
aws logs tail /aws/lambda/nb-rag-sys-webhook-fathom --follow
# Verify webhook configuration in Fathom
# Ensure URL and API key are correct
Issue: Transcript not searchable
# Check S3 documents bucket
aws s3 ls s3://nb-rag-sys-documents/ --recursive
# Check Bedrock Knowledge Base sync status
aws bedrock-agent get-data-source \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id]
# Trigger manual sync if needed
aws bedrock-agent start-ingestion-job \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id]
HelpScout Support Tickets
Automatically ingest support tickets from HelpScout.
Setup
- Configure Webhook in HelpScout:
- Log in to https://www.helpscout.com
- Navigate to Manage → Apps → Webhooks
- Click “Create Webhook”
- Webhook URL:
https://[your-api-gateway-url]/webhooks/helpscout - Select events:
conversation.created,conversation.updated - Secret Key:
[your-helpscout-api-key] - Save webhook
- Verify Configuration:
# Test webhook endpoint curl -X POST https://[api-url]/webhooks/helpscout \ -H "x-api-key: [api-key]" \ -H "Content-Type: application/json" \ -d '{ "event": "conversation.created", "data": { "conversation_id": "123", "subject": "Test ticket", "preview": "This is a test ticket." } }'
What Gets Ingested
- Ticket Subject: Conversation subject
- Messages: All customer and agent messages
- Metadata:
- Conversation ID
- Customer email
- Tags
- Status (open, closed)
- Created/updated dates
- Assigned agent
Processing Flow
1. Ticket created/updated in HelpScout
2. HelpScout sends webhook
3. Lambda validates API key
4. Lambda fetches full conversation via HelpScout API
5. Lambda calls Classify Lambda for categorization
6. Lambda writes document and .metadata.json sidecar to S3
7. Classification stored in DynamoDB
8. Bedrock Knowledge Base syncs document with metadata
9. Ticket content searchable after sync
Note: The .metadata.json sidecar file enables multi-tenant filtering in RAG queries.
Classification
The system automatically classifies tickets into:
- Categories: technical, billing, feature-request, bug-report, etc.
- Sentiment: positive, neutral, negative
- Priority: low, medium, high, urgent
View classifications:
aws dynamodb query \
--table-name nb-rag-sys-classify \
--key-condition-expression "document_id = :id" \
--expression-attribute-values '{":id": {"S": "helpscout-123"}}'
Linear Entity Sync
Linear is used as the source of truth for Clients (teams) and Projects. The system syncs Linear data to DynamoDB for the multi-tenant classification system.
Two Sync Methods
- Real-time Webhooks: Instant updates when teams/projects change in Linear
- Full Sync Lambda: On-demand sync of all teams and projects
Webhook Setup
- Configure Webhook in Linear:
- Log in to https://linear.app
- Navigate to Settings → Workspace → Webhooks
- Click “New Webhook”
- Webhook URL:
https://[your-api-gateway-url]/webhooks/linear - Select resources:
Team,Project - Select events:
create,update,remove - Add signing secret (store in AWS Secrets Manager)
- Save webhook
- Verify Configuration:
# Check webhook logs aws logs tail /aws/lambda/nb-rag-sys-linear-webhook --follow
Full Sync (On-Demand)
Run a full sync to populate all teams and projects:
# Invoke the Linear sync Lambda
aws lambda invoke \
--function-name nb-rag-sys-linear-sync \
--payload '{}' \
--cli-binary-format raw-in-base64-out \
/tmp/response.json
# Check response
cat /tmp/response.json | jq
# Monitor worker logs
aws logs tail /aws/lambda/nb-rag-sys-linear-sync-worker --follow
What Gets Synced
Teams → CLIENT records:
- Team ID, Name, Key, Description
- EntityType: “CLIENT” (for GSI queries)
- CreatedAt/UpdatedAt timestamps
Projects → PROJECT records:
- Project ID, Name, Description, State
- Parent ClientId (team relationship)
- EntityType: “PROJECT” (for GSI queries)
- Start/Target dates
Verify Sync Results
# Count entities by type
aws dynamodb scan --table-name nb-rag-sys-classify --output json | \
jq '[.Items[] | .EntityType.S] | group_by(.) | map({type: .[0], count: length})'
# List all clients
aws dynamodb query \
--table-name nb-rag-sys-classify \
--index-name EntityTypeIndex \
--key-condition-expression "EntityType = :type" \
--expression-attribute-values '{":type": {"S": "CLIENT"}}' \
--output json | jq '.Items[] | .Name.S'
Domain Mappings
After syncing, add domain mappings to enable document classification:
# Run the domain mapping script
python3 scripts/add_domain_mappings.py
Edit scripts/add_domain_mappings.py to add new domain-to-client mappings:
DOMAIN_MAPPINGS = {
"clientdomain.com": ("client-uuid", None), # Domain -> Client
"project.com": ("client-uuid", "project-uuid"), # Domain -> Client + Project
}
Manual Ingestion
Upload Custom Documents
For documents not covered by webhooks, use the manual ingestion API.
API Endpoint
POST /ingest
Authorization: Bearer <jwt-token>
Content-Type: application/json
Request Format
{
"document_id": "doc-123",
"title": "Product Requirements Document",
"content": "Full document content here...",
"metadata": {
"source": "confluence",
"author": "john@company.com",
"created_at": "2025-11-08T12:00:00Z",
"url": "https://confluence.company.com/doc-123",
"tags": ["product", "requirements"]
}
}
Response Format
{
"document_id": "doc-123",
"chunks_created": 12,
"vectors_upserted": 12,
"status": "success"
}
Example
# Get JWT token
TOKEN=$(curl -X POST https://[cognito-domain]/oauth2/token \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=[client-id]" \
-d "client_secret=[client-secret]" \
| jq -r '.access_token')
# Upload document
curl -X POST https://[api-url]/ingest \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"document_id": "doc-123",
"title": "Product Requirements Document",
"content": "The system should support user authentication via OAuth 2.0. Users should be able to log in using Google accounts. The system should store user preferences in DynamoDB.",
"metadata": {
"source": "confluence",
"author": "john@company.com",
"url": "https://confluence.company.com/doc-123"
}
}'
Chunking Strategy
The system automatically chunks documents via Bedrock Knowledge Base:
- Chunk size: 512 tokens (FIXED_SIZE strategy)
- Overlap: 20% (~100 tokens to preserve context)
- LLM parsing: Disabled (sidecar
.metadata.jsonfiles provide metadata) - Metadata: Extracted from sidecar files, not parsed from document content
Example:
Document (2000 tokens)
→ Chunk 1: tokens 0-500
→ Chunk 2: tokens 450-950 (50 token overlap)
→ Chunk 3: tokens 900-1400
→ Chunk 4: tokens 1350-1850
→ Chunk 5: tokens 1800-2000
Bulk Import
Import from S3
For large document sets, upload to S3 and trigger batch processing.
Setup
- Upload documents to S3:
```bash
Create S3 bucket for staging
aws s3 mb s3://nb-rag-sys-ingest-staging
Upload documents
aws s3 cp documents/ s3://nb-rag-sys-ingest-staging/ –recursive
2. **Trigger batch import Lambda**:
```bash
# Invoke bulk import Lambda
aws lambda invoke \
--function-name nb-rag-sys-bulk-import \
--payload '{
"bucket": "nb-rag-sys-ingest-staging",
"prefix": "documents/"
}' \
/tmp/response.json
cat /tmp/response.json
Supported Formats
- Text:
.txt,.md - Documents:
.pdf,.docx - Code:
.py,.js,.java, etc. - Data:
.json,.csv
Processing Flow
1. Documents uploaded to S3 documents bucket (with .metadata.json sidecar files)
2. Bulk import Lambda triggered (or scheduled sync every 5 minutes)
3. Bedrock Knowledge Base syncs bucket contents
4. For each document:
a. Read document and sidecar metadata from S3
b. Chunk content (512 tokens, 20% overlap)
c. Generate embeddings via Titan Embeddings v2
d. Store vectors in S3 Vectors with metadata
5. Sync job returns summary (scanned, indexed, deleted, failed counts)
Note: LLM parsing is disabled. Metadata is provided via sidecar .metadata.json files created by the sync Lambdas. This ensures 100% ingestion success by avoiding the S3 Vectors 2KB filterable metadata limit.
Import from Database
Extract data from existing databases.
Example: Import from PostgreSQL
import psycopg2
import boto3
import json
# Connect to database
conn = psycopg2.connect(
host="db.company.com",
database="knowledge",
user="readonly",
password="..."
)
# Fetch documents
cursor = conn.cursor()
cursor.execute("SELECT id, title, content, created_at FROM documents")
# Lambda client
lambda_client = boto3.client('lambda')
# Process each document
for row in cursor:
doc_id, title, content, created_at = row
# Invoke ingest Lambda
payload = {
'document_id': f'db-{doc_id}',
'title': title,
'content': content,
'metadata': {
'source': 'postgresql',
'created_at': created_at.isoformat()
}
}
response = lambda_client.invoke(
FunctionName='nb-rag-sys-ingest',
InvocationType='Event', # Async
Payload=json.dumps(payload)
)
print(f"Ingested: {doc_id} - {title}")
conn.close()
Data Management
Update Documents
To update existing documents, simply re-ingest with the same document_id:
curl -X POST https://[api-url]/ingest \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{
"document_id": "doc-123",
"title": "Updated Product Requirements Document",
"content": "Updated content...",
"metadata": {
"version": "2.0",
"updated_at": "2025-11-08T14:00:00Z"
}
}'
The system will:
- Delete old vectors for
document_id - Create new vectors with updated content
- Preserve metadata (merged with new metadata)
Delete Documents
Remove documents from the index:
# Delete from S3 documents bucket
aws s3 rm s3://nb-rag-sys-documents/doc-123.md
# Vectors are automatically removed during the next ingestion job
# (runs every 5 minutes via scheduled Lambda)
# To trigger immediate cleanup:
aws bedrock-agent start-ingestion-job \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id]
Note: The system uses data_deletion_policy = "DELETE", which automatically removes vectors when their source documents are deleted from S3. The cleanup happens during each ingestion job and is reflected in the numberOfDocumentsDeleted statistic.
List Documents
View all ingested documents:
# List all documents in S3
aws s3 ls s3://nb-rag-sys-documents/ --recursive
# Check Knowledge Base data source statistics
aws bedrock-agent get-data-source \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id]
Monitoring Ingestion
Check Ingestion Status
# Check webhook Lambda invocations (last hour)
aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name Invocations \
--dimensions Name=FunctionName,Value=nb-rag-sys-webhook-fathom \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 3600 \
--statistics Sum
# Check errors
aws cloudwatch get-metric-statistics \
--namespace AWS/Lambda \
--metric-name Errors \
--dimensions Name=FunctionName,Value=nb-rag-sys-webhook-fathom \
--start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
--end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
--period 3600 \
--statistics Sum
View Ingestion Logs
# Tail webhook logs
aws logs tail /aws/lambda/nb-rag-sys-webhook-fathom --follow
# Search for specific document
aws logs filter-log-events \
--log-group-name /aws/lambda/nb-rag-sys-webhook-fathom \
--filter-pattern "doc-123" \
--start-time $(date -u -d '24 hours ago' +%s)000
Check Knowledge Base Health
# Get Knowledge Base status
aws bedrock-agent get-knowledge-base \
--knowledge-base-id [kb-id]
# Get data source sync status
aws bedrock-agent list-ingestion-jobs \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id]
# Check S3 documents bucket contents
aws s3 ls s3://nb-rag-sys-documents/ --summarize
Best Practices
Document Preparation
- Clean Text: Remove HTML tags, special characters, excessive whitespace
- Structure: Use clear headings and sections
- Metadata: Include relevant metadata (author, date, source, etc.)
- Length: Aim for 500-2000 tokens per document (too short = low context, too long = excessive chunks)
Chunking Strategy
- Preserve Context: Ensure chunks have enough context to be meaningful
- Overlap: Use 50-100 token overlap to avoid losing context at boundaries
- Semantic Boundaries: Chunk at sentence or paragraph boundaries
- Metadata: Include document title and section in each chunk’s metadata
Metadata Design
Good metadata improves search quality:
{
"document_id": "unique-id",
"title": "Document Title",
"source": "fathom|helpscout|linear|manual",
"source_url": "https://...",
"author": "john@company.com",
"created_at": "2025-11-08T12:00:00Z",
"updated_at": "2025-11-08T14:00:00Z",
"tags": ["product", "engineering"],
"category": "documentation",
"version": "1.0",
"language": "en"
}
Incremental Updates
For frequently updated documents:
- Use consistent
document_id: Same ID = update, new ID = new document - Include version metadata: Track document versions
- Set up scheduled re-ingestion: For sources without webhooks
Error Handling
- Retry Logic: Implement exponential backoff for transient errors
- Dead Letter Queue: Send failed ingestions to SQS DLQ for later processing
- Alerting: Set up CloudWatch alarms for high error rates
- Logging: Log all ingestion attempts with document IDs for debugging
Performance Optimization
Batch Processing
For large imports, upload documents to S3 and trigger a single sync:
# Upload all documents to S3
aws s3 sync ./documents/ s3://nb-rag-sys-documents/
# Trigger single Knowledge Base sync
aws bedrock-agent start-ingestion-job \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id]
# Monitor sync progress
aws bedrock-agent get-ingestion-job \
--knowledge-base-id [kb-id] \
--data-source-id [ds-id] \
--ingestion-job-id [job-id]
Async Processing
Use async invocations for non-blocking operations:
# Async Lambda invocation
lambda_client.invoke(
FunctionName='nb-rag-sys-ingest',
InvocationType='Event', # Don't wait for response
Payload=json.dumps(payload)
)
Caching
Cache embeddings for frequently updated documents:
# Check cache before generating embedding
cache_key = f"embedding:{hash(content)}"
cached_embedding = redis.get(cache_key)
if cached_embedding:
embedding = json.loads(cached_embedding)
else:
embedding = generate_embedding(content)
redis.set(cache_key, json.dumps(embedding), ex=3600)
Last updated: 2025-12-31