Upload Entity Catalog
The Entities Service provides comprehensive entity and collection management functionality for handling data uploads, metadata management, and embedding job triggering. It supports multiple file formats and integrates with pgvector for vector storage.
Quick Overview
Entities are candidate products (for retail industry) or candidate articles (for news/media industry) you want to recommend. To make recommendations, you must ingest your entity catalog first.
You can ingest entities from files (CSV) or a database (MongoDB). You must provide PROJECT_ID, PRIMARY_KEY (the unique identifier for entities), and MODEL_KEY (model name to embed candidates).
Core Features
- File Upload Processing: Support for CSV files and MongoDB databases
- Collection Management: Create and manage data collections with metadata
- Background Processing: Async file processing with job tracking
- Embedding Integration: Automatic embedding job triggering for ML workflows
- pgvector Integration: Vector storage support for similarity search
- Data Preview: Preview data before ingestion
API Endpoints
Entity Ingestion
POST /api/entities/ingest
Upload and process entity data from files or database.
Authentication: Required (JWT token)
Query Parameters:
project_id: Project ID (from step 2 of quickstart)primary_key: Unique identifier for entities (e.g., product-id)model_key: Model name to embed candidatessource: Data source type ("files" or "mongodb")
Ingest from Files
Form Data:
files: CSV file(s)template_file: Recommendation template JSON file
Example:
curl -X POST "https://api.trydodo.xyz/api/entities/ingest?project_id=${PROJECT_ID}&primary_key=${PRIMARY_KEY}&model_key=${MODEL_KEY}&source=files" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-F "files=@product_catalog.csv;type=text/csv" \
-F "template_file=@recommendation_template.json;type=application/json"
Ingest from Database
Request Body:
{
"connection_string": "your-mongodb-connection-string",
"database_name": "your-database",
"collection_name": "products",
"filters": {}
}
Form Data:
template_file: Recommendation template JSON file
Example:
curl -X POST "https://api.trydodo.xyz/api/entities/ingest?project_id=${PROJECT_ID}&primary_key=${PRIMARY_KEY}&model_key=${MODEL_KEY}&source=mongodb" \
-H "Authorization: Bearer ${ACCESS_TOKEN}" \
-H "Content-Type: application/json" \
-d '{
"connection_string": "your-mongodb-connection-string",
"database_name": "your-database",
"collection_name": "products",
"filters": {}
}' \
-F "template_file=@recommendation_template.json;type=application/json"
Data Models
Entity
{
"user_id": "string",
"project_id": "string",
"name": "string",
"docs": [
{
"doc_id": "string",
"title": "string",
"content": "string",
"doc_type": "string",
"price": "number",
"metadata": {},
"tags": ["string"]
}
]
}
EntityCreateResponse
{
"entity_id": "string"
}
EntityResponse
{
"entity_id": "string",
"user_id": "string",
"project_id": "string",
"name": "string",
"description": "string",
"docs": [],
"created_at": "string"
}
IngestJobResponse
{
"job_id": "string",
"status": "string",
"message": "string"
}
DataPreviewResponse
{
"columns": ["string"],
"rows": [{}],
"total_rows": "number"
}
File Processing
Supported File Formats
CSV Files
- Format: Comma-separated values
- Encoding: UTF-8
- Size Limit: 500MB per file
- Processing: Chunked reading for memory efficiency
MongoDB Databases
- Connection: Standard MongoDB connection strings
- Collections: Any collection with supported schema
- Filters: Optional MongoDB query filters
- Processing: Batched document processing
Data Schema
The service expects data with the following schema (flexible mapping):
| Field | Type | Required | Description |
|---|---|---|---|
id | string | ❌ | Unique document identifier |
title | string | ❌ | Document title (fallback: name) |
content | string | ❌ | Document content (fallback: description) |
description | string | ❌ | Document description |
doc_type | string | ❌ | Document type (default: "document") |
price | number | ❌ | Price information |
metadata | object | ❌ | Additional metadata |
tags | string | ❌ | Comma-separated tags |
Configuration
Environment Variables
| Variable | Description | Required | Default |
|---|---|---|---|
MONGO_URL | MongoDB connection string | ✅ | - |
MONGO_DATABASE_NAME | Database name | ✅ | - |
PORT | Service port | ❌ | 5051 |
MODEL_MAPPING_PATH | Path to model mapping file | ❌ | - |
Model Mapping
The service integrates with ML models through a model mapping configuration:
# model_mapping.yaml
model_name: "embedding_model"
pgvector_endpoint: "http://localhost:8080"
Usage Examples
Quick Start Workflow
Follow these steps to get started with entity ingestion:
- Get your API key from /auth
- Create a project to organize your recommendation ideas
- Ingest your entity catalog using the examples below
- Get recommendations using the recommend API
- Track feedback to improve performance
Python Client
import requests
import json
# Base URL
BASE_URL = "https://api.trydodo.xyz"
# Authentication token
ACCESS_TOKEN = "your_access_token"
PROJECT_ID = "your_project_id"
PRIMARY_KEY = "product_id" # Your entity unique identifier
MODEL_KEY = "embedding_model_name" # From models list
# Headers
headers = {
"Authorization": f"Bearer {ACCESS_TOKEN}"
}
# Ingest from CSV file
with open('product_catalog.csv', 'rb') as f:
with open('recommendation_template.json', 'rb') as template:
files = {
'files': f,
'template_file': template
}
response = requests.post(
f"{BASE_URL}/api/entities/ingest?project_id={PROJECT_ID}&primary_key={PRIMARY_KEY}&model_key={MODEL_KEY}&source=files",
headers=headers,
files=files
)
if response.status_code == 200:
result = response.json()
print(f"Ingestion started: {result}")
# Ingest from MongoDB
mongo_data = {
"connection_string": "mongodb://localhost:27017",
"database_name": "ecommerce",
"collection_name": "products",
"filters": {"category": "electronics"}
}
with open('recommendation_template.json', 'rb') as template:
files = {'template_file': template}
response = requests.post(
f"{BASE_URL}/api/entities/ingest?project_id={PROJECT_ID}&primary_key={PRIMARY_KEY}&model_key={MODEL_KEY}&source=mongodb",
headers={**headers, "Content-Type": "application/json"},
data=json.dumps(mongo_data),
files=files
)
if response.status_code == 200:
result = response.json()
print(f"MongoDB ingestion started: {result}")
JavaScript Client
// Ingest from CSV file
const ingestFromCSV = async (projectId, primaryKey, modelKey, csvFile, templateFile) => {
const formData = new FormData();
formData.append('files', csvFile);
formData.append('template_file', templateFile);
const response = await fetch(
`https://api.trydodo.xyz/api/entities/ingest?project_id=${projectId}&primary_key=${primaryKey}&model_key=${modelKey}&source=files`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${ACCESS_TOKEN}`
},
body: formData
}
);
if (response.ok) {
return await response.json();
} else {
throw new Error(`Ingestion failed: ${response.statusText}`);
}
};
// Ingest from MongoDB
const ingestFromMongoDB = async (projectId, primaryKey, modelKey, mongoConfig, templateFile) => {
const formData = new FormData();
formData.append('template_file', templateFile);
const response = await fetch(
`https://api.trydodo.xyz/api/entities/ingest?project_id=${projectId}&primary_key=${primaryKey}&model_key=${modelKey}&source=mongodb`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${ACCESS_TOKEN}`,
'Content-Type': 'application/json'
},
body: JSON.stringify(mongoConfig)
}
);
if (response.ok) {
return await response.json();
} else {
throw new Error(`MongoDB ingestion failed: ${response.statusText}`);
}
};
Background Processing
Job Lifecycle
- Queued: Job is created and queued for processing
- Processing: Files are being processed
- Completed: Processing finished successfully
- Failed: Processing failed with error
Job Status Tracking
# Poll job status until completion
import time
def wait_for_job_completion(jobId, max_wait_time=300):
start_time = time.time()
while time.time() - start_time < max_wait_time:
response = requests.get(
f"{BASE_URL}/ingest/ingest/{jobId}/status",
headers=headers
)
if response.status_code == 200:
job = response.json()
status = job['status']
if status == 'completed':
print("Job completed successfully!")
return True
elif status == 'failed':
print(f"Job failed: {job['message']}")
return False
else:
print(f"Job status: {status}")
time.sleep(5)
else:
print(f"Error checking status: {response.status_code}")
break
return False
Error Handling
Common Error Scenarios
File Upload Errors
{
"detail": "File size 600.00MB exceeds limit of 500MB"
}
Unsupported File Types
{
"detail": "Unsupported file type: document.txt"
}
SQL Query Errors
{
"detail": "Error processing SQL data: no such table: products"
}
Job Not Found
{
"detail": "Job not found"
}
Best Practices
- File Size: Keep files under 500MB for optimal performance
- Data Validation: Preview data before full ingestion
- Error Handling: Implement proper error handling for background jobs
- Job Monitoring: Monitor job status for long-running operations
Integration with Other Services
Embedding Integration
The Entities service automatically triggers embedding jobs when collections are created:
# Embedding job is triggered automatically
# when collection is uploaded
await EntityService._trigger_embedding_job(
collection_id=collection_id,
docs=processed_docs
)
pgvector Integration
Processed embeddings are automatically uploaded to pgvector:
# Upload embeddings to vector database
await EntityService._upload_embeddings_to_pgvector(
collection_id=collection_id,
embeddings=result.get("embeddings", []),
docs=docs
)
Shared Services
- Authentication: Uses shared JWT authentication
- Database: Utilizes shared MongoDB connection
- Models: Shares common data models
Performance Considerations
Memory Management
- Chunked Processing: Large files processed in 10,000-row chunks
- Background Tasks: Async processing prevents blocking
- File Cleanup: Temporary files are automatically cleaned up
Scalability
- Horizontal Scaling: Multiple service instances supported
- Database Indexing: Optimized queries for large datasets
- Caching: Model mappings cached for performance
Deployment
The Entities service is deployed and accessible at: https://api.trydodo.xyz
Quick Start Integration
To quickly integrate with the Entities service:
- Sign up at /auth for instant API access
- Get your tokens using the login endpoint
- Create a project to organize your entities
- Ingest entities using the
/api/entities/ingestendpoint - Start making recommendations with the recommend API
For complete workflow examples, see the Quick Start guide.
Docker Deployment
FROM python:3.12-slim
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
EXPOSE 8080
CMD ["server"]
Monitoring and Logging
Health Monitoring
- Endpoint:
/health - Metrics: Service status and basic health checks
- Alerting: Integration with monitoring systems
Logging
- Structured Logging: JSON-formatted logs
- Job Tracking: Detailed job lifecycle logging
- Error Tracking: Comprehensive error reporting
- Performance Metrics: Processing time and throughput
Security
Authentication
- JWT Tokens: Secure token-based authentication
- User Authorization: User-scoped data access
- Project Isolation: Project-level data segregation
Data Security
- File Validation: Comprehensive file type and size validation
- SQL Injection Prevention: Parameterized queries for SQL processing
- Temporary File Security: Secure handling of temporary files
Best Practices
- Input Validation: All inputs validated and sanitized
- Error Handling: Secure error messages without information leakage
- Access Control: Proper authorization checks on all endpoints