Learn how to programmatically index documents using Onyx’s ingestion API
https://cloud.onyx.app/onyx-api/ingestion
or your own domain.Prepare your Connector
cc_pair_id
from the URL.https://cloud.onyx.app/admin/connector/308
-> cc_pair_id
is 243
Prepare your request
import requests
API_BASE_URL = "https://cloud.onyx.app/onyx-api/ingestion" # or your own domain
API_KEY = "YOUR_KEY_HERE"
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
Process your documents into the proper JSON object
cc_pair_id
in the payload!IngestionDocument
object.Minimum valid payload
{
"document": {
"id": "my_unique_id_1",
"semantic_identifier": "Onyx FAQ v1",
"sections": [
{ "text": "What is Onyx?\nOnyx is..." }
],
"source": "file",
"metadata": {
"category": "faq"
}
},
"cc_pair_id": 243
}
Full payload with all optional fields
{
"document": {
"id": "my_unique_id_1",
"semantic_identifier": "Onyx FAQ - Title shown in UI",
"title": "Onyx FAQ v1 - Title for Search",
"sections": [
{
"text": "What is Onyx?\nOnyx is...",
"link": "https://docs.onyx.app/faq#what-is-onyx"
},
{
"text": "How do I get started?\nTo get started...",
"link": "https://docs.onyx.app/faq#getting-started"
},
{
"image_file_id": "uuid_generated_by_onyx_file_store",
"text": "Advanced - Must upload image first! POST /user/file/upload",
"link": "https://docs.onyx.app/faq#about"
}
],
"source": "file",
"metadata": {
"category": "faq",
"tags": ["frequently-asked", "help"]
},
"doc_updated_at": "2025-09-19T08:20:00Z",
"chunk_count": 15,
"primary_owners": [
{
"display_name": "Alex Chen",
"first_name": "Alex",
"middle_initial": null,
"last_name": "Chen",
"email": "alex@onyx.app"
}
],
"secondary_owners": [
{
"display_name": "Sarah Johnson",
"first_name": "Sarah",
"middle_initial": "M",
"last_name": "Johnson",
"email": "sarah@onyx.app"
}
],
"from_ingestion_api": true
},
"cc_pair_id": 243
}
import os
from pathlib import Path
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument
def read_files_from_folder(folder_path, cc_pair_id=243):
"""
Read PDF, TXT, DOCX, CSV, and XLSX files and create minimal ingestion payloads
"""
documents = []
folder = Path(folder_path)
# Supported file extensions
supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}
for file_path in folder.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
try:
# Extract text based on file type
if file_path.suffix.lower() == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_path.suffix.lower() == '.pdf':
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
elif file_path.suffix.lower() == '.docx':
doc = DocxDocument(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
elif file_path.suffix.lower() in ['.csv', '.xlsx']:
df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
content = df.to_string(index=False)
# Skip empty files
if not content.strip():
continue
# Create minimal viable payload
document_payload = {
"document": {
"semantic_identifier": file_path.name,
"sections": [
{"text": content}
],
"source": "file",
"metadata": {
"file_type": file_path.suffix.lower()
}
},
"cc_pair_id": cc_pair_id
}
documents.append(document_payload)
print(f"Processed: {file_path.name}")
except Exception as e:
print(f"Error reading {file_path.name}: {e}")
continue
return documents
# Example usage
LOCAL_FOLDER = "./documents" # Change to your folder path
CC_PAIR_ID = 243 # Use your actual cc_pair_id
# Read and process files
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)
print(f"\nFound {len(documents_to_ingest)} documents ready for ingestion")
Make the request to the /ingestion endpoint
successful_ingestions = 0
failed_ingestions = 0
for i, document_data in enumerate(documents_to_ingest):
print(f"Ingesting document {i+1}/{len(documents_to_ingest)}: {document_data['document']['semantic_identifier']}")
response = requests.post(
f"{API_BASE_URL}/ingestion",
headers=headers,
json=document_data
)
if response.status_code == 200:
print(f"Successfully ingested: {document_data['document']['semantic_identifier']}")
successful_ingestions += 1
else:
print(f"Failed to ingest {document_data['document']['semantic_identifier']}: {response.status_code}")
print(f" Error: {response.text}")
failed_ingestions += 1
print(f"\nIngestion complete: {successful_ingestions} successful, {failed_ingestions} failed")
import requests
import os
from pathlib import Path
import time
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument
API_BASE_URL = "https://cloud.onyx.app/onyx-api/ingestion" # or your own domain
API_KEY = "YOUR_KEY_HERE"
CC_PAIR_ID = 243 # Replace with your actual cc_pair_id
LOCAL_FOLDER = "./documents" # Change this to your folder path
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
def read_files_from_folder(folder_path, cc_pair_id=243):
"""
Read PDF, TXT, DOCX, CSV, and XLSX files and create minimal ingestion payloads
"""
documents = []
folder = Path(folder_path)
if not folder.exists():
print(f"Folder does not exist: {folder_path}")
return documents
supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}
print(f"Reading files from: {folder.absolute()}")
for file_path in folder.rglob('*'):
if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
try:
# Extract text based on file type
if file_path.suffix.lower() == '.txt':
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
elif file_path.suffix.lower() == '.pdf':
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
content = ""
for page in reader.pages:
content += page.extract_text() + "\n"
elif file_path.suffix.lower() == '.docx':
doc = DocxDocument(file_path)
content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
elif file_path.suffix.lower() in ['.csv', '.xlsx']:
df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
content = df.to_string(index=False)
# Skip empty files
if not content.strip():
print(f"Skipping empty file: {file_path.name}")
continue
# Create minimal viable payload
document_payload = {
"document": {
"semantic_identifier": file_path.name,
"sections": [
{"text": content}
],
"source": "file",
"metadata": {
"file_type": file_path.suffix.lower()
}
},
"cc_pair_id": cc_pair_id
}
documents.append(document_payload)
print(f"Processed: {file_path.name}")
except Exception as e:
print(f"Error reading {file_path.name}: {e}")
continue
return documents
# Read all documents from folder
print("Starting file ingestion process...")
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)
if not documents_to_ingest:
print("No documents found to ingest. Check your folder path and file types.")
exit(1)
print(f"Found {len(documents_to_ingest)} documents to ingest")
# Make the ingestion requests for all documents
successful_ingestions = 0
failed_ingestions = 0
print("\nStarting ingestion...")
for i, document_data in enumerate(documents_to_ingest):
print(f"[{i+1}/{len(documents_to_ingest)}] Ingesting: {document_data['document']['semantic_identifier']}")
response = requests.post(
f"{API_BASE_URL}/ingestion",
headers=headers,
json=document_data
)
if response.status_code == 200:
result = response.json()
print(f"Success: {document_data['document']['semantic_identifier']}")
if result.get('already_existed'):
print(" Document was updated (already existed)")
else:
print(" New document created")
successful_ingestions += 1
else:
print(f"Failed: {document_data['document']['semantic_identifier']}")
print(f" Status: {response.status_code}")
print(f" Error: {response.text}")
failed_ingestions += 1
print(f"\nIngestion Summary:")
print(f" Successful: {successful_ingestions}")
print(f" Failed: {failed_ingestions}")
print(f" Total: {len(documents_to_ingest)}")
#!/usr/bin/env python3
"""
Ingestion Script
"""
import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Any, List, Optional
import requests
from requests.exceptions import RequestException
from tqdm import tqdm # type: ignore
def setup_logging() -> None:
"""Configure logging for the application."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
def parse_arguments() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="Crawl a directory for JSON files and send them to an API endpoint."
)
parser.add_argument(
"-d", "--directory",
required=True,
type=str,
help="Directory to crawl for JSON files"
)
parser.add_argument(
"-t", "--tracking-file",
required=True,
type=str,
help="File to track processed document IDs (one per line)"
)
parser.add_argument(
"-u", "--url",
type=str,
help="API endpoint URL to send JSON data to",
default="http://localhost:8080/onyx-api/ingestion"
)
parser.add_argument(
"--headers",
type=str,
help="JSON string of headers to include in the request"
)
parser.add_argument(
"--timeout",
type=int,
default=30,
help="Request timeout in seconds (default: 30)"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't actually send requests, just print what would be sent"
)
parser.add_argument(
"--cc-pair-id",
type=int,
help="Connector credential pair ID to associate the documents with"
)
return parser.parse_args()
def find_json_files(directory: str) -> List[Path]:
"""
Find all JSON files in the specified directory and its subdirectories.
Args:
directory: The directory to search in
Returns:
A list of Path objects for all JSON files found
"""
directory_path = Path(directory)
if not directory_path.exists() or not directory_path.is_dir():
raise ValueError(f"The specified path '{directory}' is not a valid directory")
json_files = list(directory_path.glob("**/*.json"))
return json_files
def load_processed_ids(tracking_file: str) -> set:
"""
Load already processed document IDs from the tracking file.
Args:
tracking_file: Path to the tracking file
Returns:
A set of document IDs that have already been processed
"""
tracking_path = Path(tracking_file)
if not tracking_path.exists():
print(f"Tracking file '{tracking_file}' not found. Creating new file.")
tracking_path.parent.mkdir(parents=True, exist_ok=True)
tracking_path.touch()
return set()
try:
with open(tracking_path, "r", encoding="utf-8") as f:
processed_ids = {line.strip() for line in f if line.strip()}
return processed_ids
except Exception as e:
logging.getLogger(__name__).warning(f"Error reading tracking file: {e}. Starting with empty set.")
return set()
def save_processed_id(tracking_file: str, document_id: str) -> None:
"""
Save a processed document ID to the tracking file.
Args:
tracking_file: Path to the tracking file
document_id: The document ID to save
"""
try:
with open(tracking_file, "a", encoding="utf-8") as f:
f.write(f"{document_id}\n")
except Exception as e:
logging.getLogger(__name__).error(f"Error saving document ID to tracking file: {e}")
def transform_json_to_document_format(file_path: Path, cc_pair_id: int | None = None) -> dict:
"""
Transform a JSON file into the document format required by the API.
Args:
file_path: Path to the JSON file
cc_pair_id: Optional connector credential pair ID
Returns:
A dictionary in the required document format
"""
logger = logging.getLogger(__name__)
try:
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# Create the document structure
document: dict[str, Any] = {
"document": {
"id": data["id"],
"sections": [
{
"text": data["page_content"],
"link": data["source"]
}
],
"semantic_identifier": data["title"],
"metadata": {
"tags": data["metadata"]["tags"]
},
"source": "web"
}
}
# Add cc_pair_id if provided
if cc_pair_id is not None:
document["cc_pair_id"] = cc_pair_id
logger.debug(f"Added cc_pair_id {cc_pair_id} to document from {file_path}")
return document
except Exception as e:
logger.error(f"Error transforming {file_path}: {str(e)}")
return {}
def send_json_to_api(
file_path: Path,
api_url: str,
tracking_file: str,
document_id: str,
headers: Optional[dict] = None,
timeout: int = 30,
dry_run: bool = False,
cc_pair_id: Optional[int] = None
) -> bool:
"""
Read a JSON file, transform it to the required format, and send to the API.
Args:
file_path: Path to the JSON file
api_url: URL of the API endpoint
tracking_file: Path to the tracking file
document_id: The document ID to track
headers: Optional headers to include in the request
timeout: Request timeout in seconds
dry_run: If True, don't actually send the request
cc_pair_id: Optional connector credential pair ID
Returns:
True if successful, False otherwise
"""
logger = logging.getLogger(__name__)
try:
# Transform the JSON to the required format
transformed_data = transform_json_to_document_format(file_path, cc_pair_id)
if not transformed_data:
logger.error(f"Failed to transform data from {file_path}")
return False
if dry_run:
logger.info(f"DRY RUN: Would send transformed data from {file_path} to {api_url}")
save_processed_id(tracking_file, document_id)
return True
response = requests.post(
api_url,
json=transformed_data,
headers=headers or {},
timeout=timeout
)
response.raise_for_status()
logger.info(f"Successfully sent transformed data from {file_path} to API")
# Save the document ID to tracking file on successful processing
save_processed_id(tracking_file, document_id)
return True
except json.JSONDecodeError:
logger.error(f"Failed to parse JSON from {file_path}")
except RequestException as e:
logger.error(f"API request failed for {file_path}: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error processing {file_path}: {str(e)}")
return False
def main() -> int:
"""Main function to run the script."""
setup_logging()
logger = logging.getLogger(__name__)
try:
args = parse_arguments()
# Parse headers if provided
headers = None
if args.headers:
try:
headers = json.loads(args.headers)
except json.JSONDecodeError:
logger.error("Failed to parse headers JSON string")
return 1
# Log cc_pair_id if provided
if args.cc_pair_id is not None:
logger.info(f"Using connector credential pair ID: {args.cc_pair_id}")
# Load already processed document IDs
processed_ids = load_processed_ids(args.tracking_file)
logger.info(f"Loaded {len(processed_ids)} already processed document IDs")
# Find all JSON files
logger.info(f"Searching for JSON files in {args.directory}")
json_files = find_json_files(args.directory)
logger.info(f"Found {len(json_files)} JSON files")
if not json_files:
logger.warning("No JSON files found. Exiting.")
return 0
# Filter out already processed files
unprocessed_files = []
for file_path in json_files:
try:
# Extract document ID from the JSON file
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
document_id = data.get("id")
if not document_id:
logger.warning(f"No 'id' field found in {file_path}, skipping")
continue
if document_id not in processed_ids:
unprocessed_files.append((file_path, document_id))
else:
logger.debug(f"Skipping already processed document: {document_id}")
except Exception as e:
logger.warning(f"Error reading document ID from {file_path}: {e}, skipping")
continue
logger.info(f"Found {len(unprocessed_files)} unprocessed files out of {len(json_files)} total files")
if not unprocessed_files:
logger.info("All files have already been processed. Exiting.")
return 0
# Process each unprocessed JSON file
success_count = 0
failure_count = 0
for file_path, document_id in tqdm(unprocessed_files, desc="Processing files"):
success = send_json_to_api(
file_path=file_path,
api_url=args.url,
tracking_file=args.tracking_file,
document_id=document_id,
headers=headers,
timeout=args.timeout,
dry_run=args.dry_run,
cc_pair_id=args.cc_pair_id
)
if success:
success_count += 1
else:
failure_count += 1
# Report results
logger.info(f"Processing complete. Success: {success_count}, Failures: {failure_count}")
return 0 if failure_count == 0 else 1
except KeyboardInterrupt:
logger.info("Process interrupted by user")
return 130
except Exception as e:
logger.exception(f"Unhandled exception: {str(e)}")
return 1
if __name__ == "__main__":
main()
Was this page helpful?