The ingestion API is a lightweight way to index documents. If you need more control, consider creating a Connector in the Admin Dashboard or following the Create a Connector API guide.

When to use the Ingestion API?

Use the Ingestion API for:
  • Unsupported sources: Add content from systems that don’t have built-in Connectors
  • Supplemental data: Add extra context to existing Connector data (like README files for GitLab)
  • Editing documents: Modify documents in Onyx when the Admin cannot update the original document in the source
  • Programmatic workflows: Integrate document indexing into your existing data pipelines

Guide

Skip to the Full Code section if you don’t want the step-by-step guide. In this example, we’ll index local files with the Ingestion API.
The Ingestion API base URL is https://cloud.onyx.app/onyx-api/ingestion or your own domain.
1

Prepare your Connector

Since we want these files to appear in the Connectors page, we’ll need to first create a Connector. For this example, we’ll create a File Connector with a dummy text file.If you already have a Connector you would like to associate your Ingestion files with, click on the Connector and copy the cc_pair_id from the URL.https://cloud.onyx.app/admin/connector/308 -> cc_pair_id is 243
2

Prepare your request

import requests

API_BASE_URL = "https://cloud.onyx.app/onyx-api/ingestion" # or your own domain
API_KEY = "YOUR_KEY_HERE"

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}
3

Process your documents into the proper JSON object

If you want your documents to show up in the Connectors page, you must specify a cc_pair_id in the payload!
See Core Concepts: Documents for details on the IngestionDocument object.
import os
from pathlib import Path
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument

def read_files_from_folder(folder_path, cc_pair_id=243):
    """
    Read PDF, TXT, DOCX, CSV, and XLSX files and create minimal ingestion payloads
    """
    documents = []
    folder = Path(folder_path)
    
    # Supported file extensions
    supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}
    
    for file_path in folder.rglob('*'):
        if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
            try:
                # Extract text based on file type
                if file_path.suffix.lower() == '.txt':
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                
                elif file_path.suffix.lower() == '.pdf':
                    with open(file_path, 'rb') as f:
                        reader = PyPDF2.PdfReader(f)
                        content = ""
                        for page in reader.pages:
                            content += page.extract_text() + "\n"
                
                elif file_path.suffix.lower() == '.docx':
                    doc = DocxDocument(file_path)
                    content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
                
                elif file_path.suffix.lower() in ['.csv', '.xlsx']:
                    df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
                    content = df.to_string(index=False)
                
                # Skip empty files
                if not content.strip():
                    continue
                
                # Create minimal viable payload
                document_payload = {
                    "document": {
                        "semantic_identifier": file_path.name,
                        "sections": [
                            {"text": content}
                        ],
                        "source": "file",
                        "metadata": {
                            "file_type": file_path.suffix.lower()
                        }
                    },
                    "cc_pair_id": cc_pair_id
                }
                
                documents.append(document_payload)
                print(f"Processed: {file_path.name}")
                
            except Exception as e:
                print(f"Error reading {file_path.name}: {e}")
                continue
    
    return documents

# Example usage
LOCAL_FOLDER = "./documents"  # Change to your folder path
CC_PAIR_ID = 243  # Use your actual cc_pair_id

# Read and process files
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)
print(f"\nFound {len(documents_to_ingest)} documents ready for ingestion")
4

Make the request to the /ingestion endpoint

successful_ingestions = 0
failed_ingestions = 0

for i, document_data in enumerate(documents_to_ingest):
    print(f"Ingesting document {i+1}/{len(documents_to_ingest)}: {document_data['document']['semantic_identifier']}")
    
    response = requests.post(
        f"{API_BASE_URL}/ingestion",
        headers=headers,
        json=document_data
    )

    if response.status_code == 200:
        print(f"Successfully ingested: {document_data['document']['semantic_identifier']}")
        successful_ingestions += 1
    else:
        print(f"Failed to ingest {document_data['document']['semantic_identifier']}: {response.status_code}")
        print(f"   Error: {response.text}")
        failed_ingestions += 1

print(f"\nIngestion complete: {successful_ingestions} successful, {failed_ingestions} failed")
The API will return a success response if the document is accepted for processing. The actual indexing happens asynchronously.

Full Code

Python
import requests
import os
from pathlib import Path
import time
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument

API_BASE_URL = "https://cloud.onyx.app/onyx-api/ingestion"  # or your own domain
API_KEY = "YOUR_KEY_HERE"
CC_PAIR_ID = 243  # Replace with your actual cc_pair_id
LOCAL_FOLDER = "./documents"  # Change this to your folder path

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

def read_files_from_folder(folder_path, cc_pair_id=243):
    """
    Read PDF, TXT, DOCX, CSV, and XLSX files and create minimal ingestion payloads
    """
    documents = []
    folder = Path(folder_path)
    
    if not folder.exists():
        print(f"Folder does not exist: {folder_path}")
        return documents
    
    supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}
    
    print(f"Reading files from: {folder.absolute()}")
    
    for file_path in folder.rglob('*'):
        if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
            try:
                # Extract text based on file type
                if file_path.suffix.lower() == '.txt':
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                
                elif file_path.suffix.lower() == '.pdf':
                    with open(file_path, 'rb') as f:
                        reader = PyPDF2.PdfReader(f)
                        content = ""
                        for page in reader.pages:
                            content += page.extract_text() + "\n"
                
                elif file_path.suffix.lower() == '.docx':
                    doc = DocxDocument(file_path)
                    content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
                
                elif file_path.suffix.lower() in ['.csv', '.xlsx']:
                    df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
                    content = df.to_string(index=False)
                
                # Skip empty files
                if not content.strip():
                    print(f"Skipping empty file: {file_path.name}")
                    continue
                
                # Create minimal viable payload
                document_payload = {
                    "document": {
                        "semantic_identifier": file_path.name,
                        "sections": [
                            {"text": content}
                        ],
                        "source": "file",
                        "metadata": {
                            "file_type": file_path.suffix.lower()
                        }
                    },
                    "cc_pair_id": cc_pair_id
                }
                
                documents.append(document_payload)
                print(f"Processed: {file_path.name}")
                
            except Exception as e:
                print(f"Error reading {file_path.name}: {e}")
                continue
    return documents

# Read all documents from folder
print("Starting file ingestion process...")
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)

if not documents_to_ingest:
    print("No documents found to ingest. Check your folder path and file types.")
    exit(1)

print(f"Found {len(documents_to_ingest)} documents to ingest")

# Make the ingestion requests for all documents
successful_ingestions = 0
failed_ingestions = 0

print("\nStarting ingestion...")
for i, document_data in enumerate(documents_to_ingest):
    print(f"[{i+1}/{len(documents_to_ingest)}] Ingesting: {document_data['document']['semantic_identifier']}")
    
    response = requests.post(
        f"{API_BASE_URL}/ingestion",
        headers=headers,
        json=document_data
    )

    if response.status_code == 200:
        result = response.json()
        print(f"Success: {document_data['document']['semantic_identifier']}")
        if result.get('already_existed'):
            print("   Document was updated (already existed)")
        else:
            print("   New document created")
        successful_ingestions += 1
    else:
        print(f"Failed: {document_data['document']['semantic_identifier']}")
        print(f"   Status: {response.status_code}")
        print(f"   Error: {response.text}")
        failed_ingestions += 1

print(f"\nIngestion Summary:")
print(f"   Successful: {successful_ingestions}")
print(f"   Failed: {failed_ingestions}")
print(f"   Total: {len(documents_to_ingest)}")

Extra Example

This script crawls a specified directory for JSON files and sends the contents to the Ingestion API.
Python
#!/usr/bin/env python3
"""
Ingestion Script
"""

import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Any, List, Optional

import requests
from requests.exceptions import RequestException
from tqdm import tqdm  # type: ignore


def setup_logging() -> None:
    """Configure logging for the application."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        handlers=[logging.StreamHandler()],
    )


def parse_arguments() -> argparse.Namespace:
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(
        description="Crawl a directory for JSON files and send them to an API endpoint."
    )
    parser.add_argument(
        "-d", "--directory",
        required=True,
        type=str,
        help="Directory to crawl for JSON files"
    )
    parser.add_argument(
        "-t", "--tracking-file",
        required=True,
        type=str,
        help="File to track processed document IDs (one per line)"
    )
    parser.add_argument(
        "-u", "--url",
        type=str,
        help="API endpoint URL to send JSON data to",
        default="http://localhost:8080/onyx-api/ingestion"
    )
    parser.add_argument(
        "--headers",
        type=str,
        help="JSON string of headers to include in the request"
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=30,
        help="Request timeout in seconds (default: 30)"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="Don't actually send requests, just print what would be sent"
    )
    parser.add_argument(
        "--cc-pair-id",
        type=int,
        help="Connector credential pair ID to associate the documents with"
    )
    
    return parser.parse_args()


def find_json_files(directory: str) -> List[Path]:
    """
    Find all JSON files in the specified directory and its subdirectories.
    
    Args:
        directory: The directory to search in
        
    Returns:
        A list of Path objects for all JSON files found
    """
    directory_path = Path(directory)
    if not directory_path.exists() or not directory_path.is_dir():
        raise ValueError(f"The specified path '{directory}' is not a valid directory")
    
    json_files = list(directory_path.glob("**/*.json"))
    return json_files


def load_processed_ids(tracking_file: str) -> set:
    """
    Load already processed document IDs from the tracking file.
    
    Args:
        tracking_file: Path to the tracking file
        
    Returns:
        A set of document IDs that have already been processed
    """
    tracking_path = Path(tracking_file)
    
    if not tracking_path.exists():
        print(f"Tracking file '{tracking_file}' not found. Creating new file.")
        tracking_path.parent.mkdir(parents=True, exist_ok=True)
        tracking_path.touch()
        return set()
    
    try:
        with open(tracking_path, "r", encoding="utf-8") as f:
            processed_ids = {line.strip() for line in f if line.strip()}
        return processed_ids
    except Exception as e:
        logging.getLogger(__name__).warning(f"Error reading tracking file: {e}. Starting with empty set.")
        return set()


def save_processed_id(tracking_file: str, document_id: str) -> None:
    """
    Save a processed document ID to the tracking file.
    
    Args:
        tracking_file: Path to the tracking file
        document_id: The document ID to save
    """
    try:
        with open(tracking_file, "a", encoding="utf-8") as f:
            f.write(f"{document_id}\n")
    except Exception as e:
        logging.getLogger(__name__).error(f"Error saving document ID to tracking file: {e}")


def transform_json_to_document_format(file_path: Path, cc_pair_id: int | None = None) -> dict:
    """
    Transform a JSON file into the document format required by the API.
    
    Args:
        file_path: Path to the JSON file
        cc_pair_id: Optional connector credential pair ID
        
    Returns:
        A dictionary in the required document format
    """
    logger = logging.getLogger(__name__)
    
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)
        
        # Create the document structure
        document: dict[str, Any] = {
            "document": {
                "id": data["id"],
                "sections": [
                    {
                        "text": data["page_content"],
                        "link": data["source"]
                    }
                ],
                "semantic_identifier": data["title"],
                "metadata": {
                    "tags": data["metadata"]["tags"]
                },
                "source": "web"
            }
        }
        
        # Add cc_pair_id if provided
        if cc_pair_id is not None:
            document["cc_pair_id"] = cc_pair_id
            logger.debug(f"Added cc_pair_id {cc_pair_id} to document from {file_path}")
        
        return document
        
    except Exception as e:
        logger.error(f"Error transforming {file_path}: {str(e)}")
        return {}


def send_json_to_api(
    file_path: Path, 
    api_url: str, 
    tracking_file: str,
    document_id: str,
    headers: Optional[dict] = None, 
    timeout: int = 30,
    dry_run: bool = False,
    cc_pair_id: Optional[int] = None
) -> bool:
    """
    Read a JSON file, transform it to the required format, and send to the API.
    
    Args:
        file_path: Path to the JSON file
        api_url: URL of the API endpoint
        tracking_file: Path to the tracking file
        document_id: The document ID to track
        headers: Optional headers to include in the request
        timeout: Request timeout in seconds
        dry_run: If True, don't actually send the request
        cc_pair_id: Optional connector credential pair ID
        
    Returns:
        True if successful, False otherwise
    """
    logger = logging.getLogger(__name__)
    
    try:
        # Transform the JSON to the required format
        transformed_data = transform_json_to_document_format(file_path, cc_pair_id)
        
        if not transformed_data:
            logger.error(f"Failed to transform data from {file_path}")
            return False
        
        if dry_run:
            logger.info(f"DRY RUN: Would send transformed data from {file_path} to {api_url}")
            save_processed_id(tracking_file, document_id)
            return True
            
        response = requests.post(
            api_url,
            json=transformed_data,
            headers=headers or {},
            timeout=timeout
        )
        
        response.raise_for_status()
        logger.info(f"Successfully sent transformed data from {file_path} to API")
        
        # Save the document ID to tracking file on successful processing
        save_processed_id(tracking_file, document_id)
        return True
        
    except json.JSONDecodeError:
        logger.error(f"Failed to parse JSON from {file_path}")
    except RequestException as e:
        logger.error(f"API request failed for {file_path}: {str(e)}")
    except Exception as e:
        logger.error(f"Unexpected error processing {file_path}: {str(e)}")
    
    return False


def main() -> int:
    """Main function to run the script."""
    setup_logging()
    logger = logging.getLogger(__name__)
    
    try:
        args = parse_arguments()
        
        # Parse headers if provided
        headers = None
        if args.headers:
            try:
                headers = json.loads(args.headers)
            except json.JSONDecodeError:
                logger.error("Failed to parse headers JSON string")
                return 1
        
        # Log cc_pair_id if provided
        if args.cc_pair_id is not None:
            logger.info(f"Using connector credential pair ID: {args.cc_pair_id}")
        
        # Load already processed document IDs
        processed_ids = load_processed_ids(args.tracking_file)
        logger.info(f"Loaded {len(processed_ids)} already processed document IDs")
        
        # Find all JSON files
        logger.info(f"Searching for JSON files in {args.directory}")
        json_files = find_json_files(args.directory)
        logger.info(f"Found {len(json_files)} JSON files")
        
        if not json_files:
            logger.warning("No JSON files found. Exiting.")
            return 0
        
        # Filter out already processed files
        unprocessed_files = []
        for file_path in json_files:
            try:
                # Extract document ID from the JSON file
                with open(file_path, "r", encoding="utf-8") as f:
                    data = json.load(f)
                document_id = data.get("id")
                
                if not document_id:
                    logger.warning(f"No 'id' field found in {file_path}, skipping")
                    continue
                    
                if document_id not in processed_ids:
                    unprocessed_files.append((file_path, document_id))
                else:
                    logger.debug(f"Skipping already processed document: {document_id}")
            except Exception as e:
                logger.warning(f"Error reading document ID from {file_path}: {e}, skipping")
                continue
        
        logger.info(f"Found {len(unprocessed_files)} unprocessed files out of {len(json_files)} total files")
        
        if not unprocessed_files:
            logger.info("All files have already been processed. Exiting.")
            return 0
        
        # Process each unprocessed JSON file
        success_count = 0
        failure_count = 0
        
        for file_path, document_id in tqdm(unprocessed_files, desc="Processing files"):
            success = send_json_to_api(
                file_path=file_path,
                api_url=args.url,
                tracking_file=args.tracking_file,
                document_id=document_id,
                headers=headers,
                timeout=args.timeout,
                dry_run=args.dry_run,
                cc_pair_id=args.cc_pair_id
            )
            
            if success:
                success_count += 1
            else:
                failure_count += 1
        
        # Report results
        logger.info(f"Processing complete. Success: {success_count}, Failures: {failure_count}")
        
        return 0 if failure_count == 0 else 1
        
    except KeyboardInterrupt:
        logger.info("Process interrupted by user")
        return 130
    except Exception as e:
        logger.exception(f"Unhandled exception: {str(e)}")
        return 1


if __name__ == "__main__":
    main()