Data Ingestion Guide

Complete guide to ingesting documents into the NorthBuilt RAG System.

Overview

The system supports multiple ingestion methods:

  1. Webhook Integration - Automatic ingestion from Fathom, HelpScout, Linear
  2. Manual Upload - Direct API calls to ingest custom documents
  3. Bulk Import - Batch processing for large document sets

Webhook Integrations

Fathom Video Transcripts

Automatically ingest video meeting transcripts from Fathom.

Setup

  1. Configure Webhook in Fathom:
    • Log in to https://app.fathom.video
    • Navigate to Settings → Integrations → Webhooks
    • Click “Add Webhook”
    • Webhook URL: https://[your-api-gateway-url]/webhooks/fathom
    • Select events: video.processed
    • Add custom header:
      • Key: x-api-key
      • Value: [your-fathom-api-key]
    • Save webhook
  2. Verify Configuration: ```bash

    Check webhook Lambda

    aws lambda get-function –function-name nb-rag-sys-webhook-fathom

Test webhook endpoint

curl -X POST https://[api-url]/webhooks/fathom
-H “x-api-key: [api-key]”
-H “Content-Type: application/json”
-d ‘{ “event”: “video.processed”, “data”: { “video_id”: “test-123”, “title”: “Test Video”, “transcript”: “This is a test transcript.” } }’


#### What Gets Ingested

- **Video Title**: Meeting title
- **Transcript**: Full transcript with timestamps
- **Metadata**:
  - Video ID
  - Duration
  - Participants
  - Recording date
  - Meeting URL

#### Processing Flow

  1. Fathom video completes processing
  2. Fathom sends webhook to API Gateway
  3. API Gateway routes to Fathom Webhook Lambda
  4. Lambda validates API key
  5. Lambda fetches full video details via Fathom API
  6. Lambda classifies content (client, project) via classify Lambda
  7. Lambda writes document and .metadata.json sidecar to S3
  8. Bedrock Knowledge Base sync triggers (scheduled)
  9. Knowledge Base chunks and generates embeddings with metadata
  10. Lambda returns 200 OK
  11. Video content searchable after next sync cycle ```

Note: The .metadata.json sidecar file enables multi-tenant filtering in RAG queries. Each document has a companion metadata file (e.g., meeting.mdmeeting.md.metadata.json).

Troubleshooting

Issue: Webhook not received

# Check API Gateway logs
aws logs tail /aws/apigateway/nb-rag-sys --follow

# Check Lambda logs
aws logs tail /aws/lambda/nb-rag-sys-webhook-fathom --follow

# Verify webhook configuration in Fathom
# Ensure URL and API key are correct

Issue: Transcript not searchable

# Check S3 documents bucket
aws s3 ls s3://nb-rag-sys-documents/ --recursive

# Check Bedrock Knowledge Base sync status
aws bedrock-agent get-data-source \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id]

# Trigger manual sync if needed
aws bedrock-agent start-ingestion-job \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id]

HelpScout Support Tickets

Automatically ingest support tickets from HelpScout.

Setup

  1. Configure Webhook in HelpScout:
    • Log in to https://www.helpscout.com
    • Navigate to Manage → Apps → Webhooks
    • Click “Create Webhook”
    • Webhook URL: https://[your-api-gateway-url]/webhooks/helpscout
    • Select events: conversation.created, conversation.updated
    • Secret Key: [your-helpscout-api-key]
    • Save webhook
  2. Verify Configuration:
    # Test webhook endpoint
    curl -X POST https://[api-url]/webhooks/helpscout \
      -H "x-api-key: [api-key]" \
      -H "Content-Type: application/json" \
      -d '{
     "event": "conversation.created",
     "data": {
       "conversation_id": "123",
       "subject": "Test ticket",
       "preview": "This is a test ticket."
     }
      }'
    

What Gets Ingested

  • Ticket Subject: Conversation subject
  • Messages: All customer and agent messages
  • Metadata:
    • Conversation ID
    • Customer email
    • Tags
    • Status (open, closed)
    • Created/updated dates
    • Assigned agent

Processing Flow

1. Ticket created/updated in HelpScout
2. HelpScout sends webhook
3. Lambda validates API key
4. Lambda fetches full conversation via HelpScout API
5. Lambda calls Classify Lambda for categorization
6. Lambda writes document and .metadata.json sidecar to S3
7. Classification stored in DynamoDB
8. Bedrock Knowledge Base syncs document with metadata
9. Ticket content searchable after sync

Note: The .metadata.json sidecar file enables multi-tenant filtering in RAG queries.

Classification

The system automatically classifies tickets into:

  • Categories: technical, billing, feature-request, bug-report, etc.
  • Sentiment: positive, neutral, negative
  • Priority: low, medium, high, urgent

View classifications:

aws dynamodb query \
  --table-name nb-rag-sys-classify \
  --key-condition-expression "document_id = :id" \
  --expression-attribute-values '{":id": {"S": "helpscout-123"}}'

Linear Entity Sync

Linear is used as the source of truth for Clients (teams) and Projects. The system syncs Linear data to DynamoDB for the multi-tenant classification system.

Two Sync Methods

  1. Real-time Webhooks: Instant updates when teams/projects change in Linear
  2. Full Sync Lambda: On-demand sync of all teams and projects

Webhook Setup

  1. Configure Webhook in Linear:
    • Log in to https://linear.app
    • Navigate to Settings → Workspace → Webhooks
    • Click “New Webhook”
    • Webhook URL: https://[your-api-gateway-url]/webhooks/linear
    • Select resources: Team, Project
    • Select events: create, update, remove
    • Add signing secret (store in AWS Secrets Manager)
    • Save webhook
  2. Verify Configuration:
    # Check webhook logs
    aws logs tail /aws/lambda/nb-rag-sys-linear-webhook --follow
    

Full Sync (On-Demand)

Run a full sync to populate all teams and projects:

# Invoke the Linear sync Lambda
aws lambda invoke \
  --function-name nb-rag-sys-linear-sync \
  --payload '{}' \
  --cli-binary-format raw-in-base64-out \
  /tmp/response.json

# Check response
cat /tmp/response.json | jq

# Monitor worker logs
aws logs tail /aws/lambda/nb-rag-sys-linear-sync-worker --follow

What Gets Synced

Teams → CLIENT records:

  • Team ID, Name, Key, Description
  • EntityType: “CLIENT” (for GSI queries)
  • CreatedAt/UpdatedAt timestamps

Projects → PROJECT records:

  • Project ID, Name, Description, State
  • Parent ClientId (team relationship)
  • EntityType: “PROJECT” (for GSI queries)
  • Start/Target dates

Verify Sync Results

# Count entities by type
aws dynamodb scan --table-name nb-rag-sys-classify --output json | \
  jq '[.Items[] | .EntityType.S] | group_by(.) | map({type: .[0], count: length})'

# List all clients
aws dynamodb query \
  --table-name nb-rag-sys-classify \
  --index-name EntityTypeIndex \
  --key-condition-expression "EntityType = :type" \
  --expression-attribute-values '{":type": {"S": "CLIENT"}}' \
  --output json | jq '.Items[] | .Name.S'

Domain Mappings

After syncing, add domain mappings to enable document classification:

# Run the domain mapping script
python3 scripts/add_domain_mappings.py

Edit scripts/add_domain_mappings.py to add new domain-to-client mappings:

DOMAIN_MAPPINGS = {
    "clientdomain.com": ("client-uuid", None),  # Domain -> Client
    "project.com": ("client-uuid", "project-uuid"),  # Domain -> Client + Project
}

Manual Ingestion

Upload Custom Documents

For documents not covered by webhooks, use the manual ingestion API.

API Endpoint

POST /ingest
Authorization: Bearer <jwt-token>
Content-Type: application/json

Request Format

{
  "document_id": "doc-123",
  "title": "Product Requirements Document",
  "content": "Full document content here...",
  "metadata": {
    "source": "confluence",
    "author": "john@company.com",
    "created_at": "2025-11-08T12:00:00Z",
    "url": "https://confluence.company.com/doc-123",
    "tags": ["product", "requirements"]
  }
}

Response Format

{
  "document_id": "doc-123",
  "chunks_created": 12,
  "vectors_upserted": 12,
  "status": "success"
}

Example

# Get JWT token
TOKEN=$(curl -X POST https://[cognito-domain]/oauth2/token \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "grant_type=client_credentials" \
  -d "client_id=[client-id]" \
  -d "client_secret=[client-secret]" \
  | jq -r '.access_token')

# Upload document
curl -X POST https://[api-url]/ingest \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "document_id": "doc-123",
    "title": "Product Requirements Document",
    "content": "The system should support user authentication via OAuth 2.0. Users should be able to log in using Google accounts. The system should store user preferences in DynamoDB.",
    "metadata": {
      "source": "confluence",
      "author": "john@company.com",
      "url": "https://confluence.company.com/doc-123"
    }
  }'

Chunking Strategy

The system automatically chunks documents via Bedrock Knowledge Base:

  • Chunk size: 512 tokens (FIXED_SIZE strategy)
  • Overlap: 20% (~100 tokens to preserve context)
  • LLM parsing: Disabled (sidecar .metadata.json files provide metadata)
  • Metadata: Extracted from sidecar files, not parsed from document content

Example:

Document (2000 tokens)
  → Chunk 1: tokens 0-500
  → Chunk 2: tokens 450-950 (50 token overlap)
  → Chunk 3: tokens 900-1400
  → Chunk 4: tokens 1350-1850
  → Chunk 5: tokens 1800-2000

Bulk Import

Import from S3

For large document sets, upload to S3 and trigger batch processing.

Setup

  1. Upload documents to S3: ```bash

    Create S3 bucket for staging

    aws s3 mb s3://nb-rag-sys-ingest-staging

Upload documents

aws s3 cp documents/ s3://nb-rag-sys-ingest-staging/ –recursive


2. **Trigger batch import Lambda**:
```bash
# Invoke bulk import Lambda
aws lambda invoke \
  --function-name nb-rag-sys-bulk-import \
  --payload '{
    "bucket": "nb-rag-sys-ingest-staging",
    "prefix": "documents/"
  }' \
  /tmp/response.json

cat /tmp/response.json

Supported Formats

  • Text: .txt, .md
  • Documents: .pdf, .docx
  • Code: .py, .js, .java, etc.
  • Data: .json, .csv

Processing Flow

1. Documents uploaded to S3 documents bucket (with .metadata.json sidecar files)
2. Bulk import Lambda triggered (or scheduled sync every 5 minutes)
3. Bedrock Knowledge Base syncs bucket contents
4. For each document:
   a. Read document and sidecar metadata from S3
   b. Chunk content (512 tokens, 20% overlap)
   c. Generate embeddings via Titan Embeddings v2
   d. Store vectors in S3 Vectors with metadata
5. Sync job returns summary (scanned, indexed, deleted, failed counts)

Note: LLM parsing is disabled. Metadata is provided via sidecar .metadata.json files created by the sync Lambdas. This ensures 100% ingestion success by avoiding the S3 Vectors 2KB filterable metadata limit.

Import from Database

Extract data from existing databases.

Example: Import from PostgreSQL

import psycopg2
import boto3
import json

# Connect to database
conn = psycopg2.connect(
    host="db.company.com",
    database="knowledge",
    user="readonly",
    password="..."
)

# Fetch documents
cursor = conn.cursor()
cursor.execute("SELECT id, title, content, created_at FROM documents")

# Lambda client
lambda_client = boto3.client('lambda')

# Process each document
for row in cursor:
    doc_id, title, content, created_at = row

    # Invoke ingest Lambda
    payload = {
        'document_id': f'db-{doc_id}',
        'title': title,
        'content': content,
        'metadata': {
            'source': 'postgresql',
            'created_at': created_at.isoformat()
        }
    }

    response = lambda_client.invoke(
        FunctionName='nb-rag-sys-ingest',
        InvocationType='Event',  # Async
        Payload=json.dumps(payload)
    )

    print(f"Ingested: {doc_id} - {title}")

conn.close()

Data Management

Update Documents

To update existing documents, simply re-ingest with the same document_id:

curl -X POST https://[api-url]/ingest \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "document_id": "doc-123",
    "title": "Updated Product Requirements Document",
    "content": "Updated content...",
    "metadata": {
      "version": "2.0",
      "updated_at": "2025-11-08T14:00:00Z"
    }
  }'

The system will:

  1. Delete old vectors for document_id
  2. Create new vectors with updated content
  3. Preserve metadata (merged with new metadata)

Delete Documents

Remove documents from the index:

# Delete from S3 documents bucket
aws s3 rm s3://nb-rag-sys-documents/doc-123.md

# Vectors are automatically removed during the next ingestion job
# (runs every 5 minutes via scheduled Lambda)
# To trigger immediate cleanup:
aws bedrock-agent start-ingestion-job \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id]

Note: The system uses data_deletion_policy = "DELETE", which automatically removes vectors when their source documents are deleted from S3. The cleanup happens during each ingestion job and is reflected in the numberOfDocumentsDeleted statistic.

List Documents

View all ingested documents:

# List all documents in S3
aws s3 ls s3://nb-rag-sys-documents/ --recursive

# Check Knowledge Base data source statistics
aws bedrock-agent get-data-source \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id]

Monitoring Ingestion

Check Ingestion Status

# Check webhook Lambda invocations (last hour)
aws cloudwatch get-metric-statistics \
  --namespace AWS/Lambda \
  --metric-name Invocations \
  --dimensions Name=FunctionName,Value=nb-rag-sys-webhook-fathom \
  --start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 3600 \
  --statistics Sum

# Check errors
aws cloudwatch get-metric-statistics \
  --namespace AWS/Lambda \
  --metric-name Errors \
  --dimensions Name=FunctionName,Value=nb-rag-sys-webhook-fathom \
  --start-time $(date -u -d '1 hour ago' +%Y-%m-%dT%H:%M:%S) \
  --end-time $(date -u +%Y-%m-%dT%H:%M:%S) \
  --period 3600 \
  --statistics Sum

View Ingestion Logs

# Tail webhook logs
aws logs tail /aws/lambda/nb-rag-sys-webhook-fathom --follow

# Search for specific document
aws logs filter-log-events \
  --log-group-name /aws/lambda/nb-rag-sys-webhook-fathom \
  --filter-pattern "doc-123" \
  --start-time $(date -u -d '24 hours ago' +%s)000

Check Knowledge Base Health

# Get Knowledge Base status
aws bedrock-agent get-knowledge-base \
  --knowledge-base-id [kb-id]

# Get data source sync status
aws bedrock-agent list-ingestion-jobs \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id]

# Check S3 documents bucket contents
aws s3 ls s3://nb-rag-sys-documents/ --summarize

Best Practices

Document Preparation

  1. Clean Text: Remove HTML tags, special characters, excessive whitespace
  2. Structure: Use clear headings and sections
  3. Metadata: Include relevant metadata (author, date, source, etc.)
  4. Length: Aim for 500-2000 tokens per document (too short = low context, too long = excessive chunks)

Chunking Strategy

  1. Preserve Context: Ensure chunks have enough context to be meaningful
  2. Overlap: Use 50-100 token overlap to avoid losing context at boundaries
  3. Semantic Boundaries: Chunk at sentence or paragraph boundaries
  4. Metadata: Include document title and section in each chunk’s metadata

Metadata Design

Good metadata improves search quality:

{
  "document_id": "unique-id",
  "title": "Document Title",
  "source": "fathom|helpscout|linear|manual",
  "source_url": "https://...",
  "author": "john@company.com",
  "created_at": "2025-11-08T12:00:00Z",
  "updated_at": "2025-11-08T14:00:00Z",
  "tags": ["product", "engineering"],
  "category": "documentation",
  "version": "1.0",
  "language": "en"
}

Incremental Updates

For frequently updated documents:

  1. Use consistent document_id: Same ID = update, new ID = new document
  2. Include version metadata: Track document versions
  3. Set up scheduled re-ingestion: For sources without webhooks

Error Handling

  1. Retry Logic: Implement exponential backoff for transient errors
  2. Dead Letter Queue: Send failed ingestions to SQS DLQ for later processing
  3. Alerting: Set up CloudWatch alarms for high error rates
  4. Logging: Log all ingestion attempts with document IDs for debugging

Performance Optimization

Batch Processing

For large imports, upload documents to S3 and trigger a single sync:

# Upload all documents to S3
aws s3 sync ./documents/ s3://nb-rag-sys-documents/

# Trigger single Knowledge Base sync
aws bedrock-agent start-ingestion-job \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id]

# Monitor sync progress
aws bedrock-agent get-ingestion-job \
  --knowledge-base-id [kb-id] \
  --data-source-id [ds-id] \
  --ingestion-job-id [job-id]

Async Processing

Use async invocations for non-blocking operations:

# Async Lambda invocation
lambda_client.invoke(
    FunctionName='nb-rag-sys-ingest',
    InvocationType='Event',  # Don't wait for response
    Payload=json.dumps(payload)
)

Caching

Cache embeddings for frequently updated documents:

# Check cache before generating embedding
cache_key = f"embedding:{hash(content)}"
cached_embedding = redis.get(cache_key)

if cached_embedding:
    embedding = json.loads(cached_embedding)
else:
    embedding = generate_embedding(content)
    redis.set(cache_key, json.dumps(embedding), ex=3600)

Last updated: 2025-12-31