Batch Inference Processing
Process thousands of inference requests efficiently using batch processing and parallel execution on GPU infrastructure.
Production Use Case: Content moderation, document classification, bulk translation, sentiment analysis at scale.
What You’ll Build
- Input: CSV/JSONL file with thousands of requests
- Processing: Parallel batch inference on multiple GPUs
- Output: Results written to file with status tracking
- Cost: ~$0.10 per 10,000 requests using RTX 4090
Use Cases
- Content Moderation: Classify 100K posts/day
- Document Processing: Extract insights from PDFs
- Sentiment Analysis: Analyze customer reviews
- Translation: Bulk translate product descriptions
Architecture
Input File (JSONL)
↓
Batch Job Manager
↓
Split into chunks (256 requests each)
↓
Parallel processing on GPUs (up to 10 workers)
↓
Aggregate results
↓
Output File (JSONL) + Progress trackingStep 1: Prepare Input Data
Create input.jsonl:
{"id": "1", "text": "This product is amazing!"}
{"id": "2", "text": "Terrible customer service"}
{"id": "3", "text": "Average quality, okay price"}Generate Test Data
# Generate 10,000 test records
node -e "
const fs = require('fs');
const stream = fs.createWriteStream('input.jsonl');
for (let i = 1; i <= 10000; i++) {
stream.write(JSON.stringify({
id: String(i),
text: \`Sample text for classification \${i}\`
}) + '\n');
}
stream.end();
"Step 2: Deploy Classification Model
Using a template for instant deployment:
# Use pre-configured template for sentiment analysis
syaala templates deploy \
--template sentiment-classification \
--name batch-classifier \
--min-replicas 2 \
--max-replicas 10Or deploy custom model:
syaala deployments create \
--name batch-classifier \
--model cardiffnlp/twitter-roberta-base-sentiment-latest \
--runtime vllm \
--gpu-type RTX_4090 \
--gpu-count 1 \
--min-replicas 2 \
--max-replicas 10 \
--env MAX_MODEL_LEN=512 \
--env MAX_BATCH_SIZE=256Step 3: Create Batch Processor
Create batch-processor.ts:
import { createClient } from '@syaala/sdk'
import fs from 'fs'
import readline from 'readline'
import { Transform } from 'stream'
// Configuration
const DEPLOYMENT_ID = process.env.DEPLOYMENT_ID!
const BATCH_SIZE = 256 // Requests per batch
const MAX_CONCURRENT = 5 // Parallel batches
const RETRY_ATTEMPTS = 3
const client = createClient(process.env.SYAALA_API_KEY!)
interface InputRecord {
id: string
text: string
}
interface OutputRecord {
id: string
text: string
sentiment: 'positive' | 'negative' | 'neutral'
confidence: number
processingTime: number
error?: string
}
class BatchProcessor {
private processed = 0
private errors = 0
private startTime = Date.now()
async processFile(inputPath: string, outputPath: string) {
console.log(`Starting batch processing: ${inputPath}`)
console.log(`Deployment: ${DEPLOYMENT_ID}`)
console.log(`Batch size: ${BATCH_SIZE}, Concurrency: ${MAX_CONCURRENT}\n`)
const inputStream = fs.createReadStream(inputPath)
const outputStream = fs.createWriteStream(outputPath)
const rl = readline.createInterface({ input: inputStream })
let batch: InputRecord[] = []
const batchPromises: Promise<void>[] = []
for await (const line of rl) {
if (!line.trim()) continue
const record = JSON.parse(line) as InputRecord
batch.push(record)
if (batch.length >= BATCH_SIZE) {
const currentBatch = [...batch]
batch = []
// Wait if we have too many concurrent batches
if (batchPromises.length >= MAX_CONCURRENT) {
await Promise.race(batchPromises)
batchPromises.splice(0, 1)
}
// Process batch
const promise = this.processBatch(currentBatch, outputStream)
batchPromises.push(promise)
}
}
// Process remaining records
if (batch.length > 0) {
await this.processBatch(batch, outputStream)
}
// Wait for all batches to complete
await Promise.all(batchPromises)
outputStream.end()
const duration = (Date.now() - this.startTime) / 1000
console.log(`\n✅ Batch processing complete!`)
console.log(` Total: ${this.processed} records`)
console.log(` Errors: ${this.errors}`)
console.log(` Duration: ${duration.toFixed(2)}s`)
console.log(` Throughput: ${(this.processed / duration).toFixed(2)} records/sec`)
}
private async processBatch(
batch: InputRecord[],
outputStream: fs.WriteStream
): Promise<void> {
const batchStartTime = Date.now()
try {
// Create parallel inference requests
const requests = batch.map(record => ({
prompt: `Classify sentiment: "${record.text}"\nSentiment:`,
maxTokens: 10,
temperature: 0.1,
stop: ['\n']
}))
// Use SDK batch inference with retries
const result = await this.retryOperation(() =>
client.inference.batch(DEPLOYMENT_ID, requests, {
parallel: true,
maxConcurrency: BATCH_SIZE
})
)
if (!result.success) {
throw new Error(result.error)
}
// Process results
const processingTime = Date.now() - batchStartTime
for (let i = 0; i < batch.length; i++) {
const record = batch[i]
const response = result.data[i]
const output: OutputRecord = {
id: record.id,
text: record.text,
sentiment: this.parseSentiment(response.text),
confidence: 0.95, // Would come from model in production
processingTime: processingTime / batch.length
}
outputStream.write(JSON.stringify(output) + '\n')
this.processed++
}
// Progress update
this.printProgress()
} catch (error) {
console.error(`\nBatch error:`, error)
// Write error records
for (const record of batch) {
const output: OutputRecord = {
id: record.id,
text: record.text,
sentiment: 'neutral',
confidence: 0,
processingTime: 0,
error: error instanceof Error ? error.message : 'Unknown error'
}
outputStream.write(JSON.stringify(output) + '\n')
this.errors++
}
}
}
private async retryOperation<T>(
operation: () => Promise<T>,
attempts = RETRY_ATTEMPTS
): Promise<T> {
for (let i = 0; i < attempts; i++) {
try {
return await operation()
} catch (error) {
if (i === attempts - 1) throw error
await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000))
}
}
throw new Error('Max retries exceeded')
}
private parseSentiment(text: string): 'positive' | 'negative' | 'neutral' {
const lower = text.toLowerCase().trim()
if (lower.includes('positive')) return 'positive'
if (lower.includes('negative')) return 'negative'
return 'neutral'
}
private printProgress() {
const duration = (Date.now() - this.startTime) / 1000
const throughput = this.processed / duration
process.stdout.write(
`\r Processed: ${this.processed} | ` +
`Errors: ${this.errors} | ` +
`Throughput: ${throughput.toFixed(2)}/sec`
)
}
}
// Run batch processor
async function main() {
const processor = new BatchProcessor()
await processor.processFile('input.jsonl', 'output.jsonl')
}
main().catch(console.error)Step 4: Run Batch Processing
# Set environment variables
export SYAALA_API_KEY=sk_live_...
export DEPLOYMENT_ID=dep_...
# Install dependencies
npm install @syaala/sdk
# Run processor
npx tsx batch-processor.tsOutput:
Starting batch processing: input.jsonl
Deployment: dep_abc123
Batch size: 256, Concurrency: 5
Processed: 10000 | Errors: 0 | Throughput: 423.18/sec
✅ Batch processing complete!
Total: 10000 records
Errors: 0
Duration: 23.64s
Throughput: 423.18 records/secStep 5: Analyze Results
# View sample results
head -n 5 output.jsonl{"id":"1","text":"This product is amazing!","sentiment":"positive","confidence":0.95,"processingTime":2.3}
{"id":"2","text":"Terrible customer service","sentiment":"negative","confidence":0.92,"processingTime":2.3}
{"id":"3","text":"Average quality, okay price","sentiment":"neutral","confidence":0.88,"processingTime":2.3}Calculate Statistics
import fs from 'fs'
import readline from 'readline'
async function analyzeResults(filepath: string) {
const rl = readline.createInterface({
input: fs.createReadStream(filepath)
})
let total = 0
let positive = 0
let negative = 0
let neutral = 0
let errors = 0
let totalTime = 0
for await (const line of rl) {
const record = JSON.parse(line)
total++
if (record.error) {
errors++
continue
}
switch (record.sentiment) {
case 'positive': positive++; break
case 'negative': negative++; break
case 'neutral': neutral++; break
}
totalTime += record.processingTime
}
console.log('Results Analysis:')
console.log(` Total: ${total}`)
console.log(` Positive: ${positive} (${(positive/total*100).toFixed(1)}%)`)
console.log(` Negative: ${negative} (${(negative/total*100).toFixed(1)}%)`)
console.log(` Neutral: ${neutral} (${(neutral/total*100).toFixed(1)}%)`)
console.log(` Errors: ${errors}`)
console.log(` Avg Time: ${(totalTime/total).toFixed(2)}ms`)
}
analyzeResults('output.jsonl')Using CLI Batch Command
Easier alternative using built-in CLI:
syaala batch create \
--deployment dep_abc123 \
--input input.jsonl \
--output output.jsonl \
--batch-size 256 \
--concurrency 5Monitor progress:
syaala batch status <job-id>Advanced: Multi-GPU Scaling
For very large datasets (millions of records):
// Deploy multiple workers
const workers = await Promise.all([
client.deployments.create(orgId, { name: 'worker-1', ... }),
client.deployments.create(orgId, { name: 'worker-2', ... }),
client.deployments.create(orgId, { name: 'worker-3', ... }),
])
// Distribute work across workers
const workerIds = workers.map(w => w.data.id)
let currentWorker = 0
function getNextWorker() {
const id = workerIds[currentWorker]
currentWorker = (currentWorker + 1) % workerIds.length
return id
}
// Process batches on different workers
await this.processBatch(batch, outputStream, getNextWorker())Cost Optimization
Strategy 1: Right-size GPU
# Small models (<7B params): Use RTX 4090
syaala deployments update dep_abc123 --gpu-type RTX_4090
# Medium models (7-13B): Use A10G
syaala deployments update dep_abc123 --gpu-type A10G
# Large models (>13B): Use A100
syaala deployments update dep_abc123 --gpu-type A100_40GBStrategy 2: Optimize Batch Size
// GPU Memory = 24GB (RTX 4090)
// Model Size = 7B params × 2 bytes (FP16) = 14GB
// Available for batching = 10GB
// Batch size = 10GB / (sequence_length × hidden_size)
const optimalBatchSize = calculateBatchSize({
gpuMemory: 24, // GB
modelSize: 14, // GB
sequenceLength: 512,
hiddenSize: 4096
})
// Result: ~256 requests per batchStrategy 3: Use Spot Instances
syaala deployments create \
--name batch-spot \
--gpu-type RTX_4090 \
--spot true \
--spot-max-price 0.30 # Max $0.30/hr (70% savings)Production Deployment
1. Error Handling & Retries
class RobustBatchProcessor extends BatchProcessor {
private failedRecords: InputRecord[] = []
async processWithRetry(record: InputRecord, retries = 3): Promise<OutputRecord> {
for (let i = 0; i < retries; i++) {
try {
return await this.processRecord(record)
} catch (error) {
if (i === retries - 1) {
this.failedRecords.push(record)
throw error
}
await new Promise(r => setTimeout(r, Math.pow(2, i) * 1000))
}
}
throw new Error('Unreachable')
}
async saveFailedRecords() {
fs.writeFileSync(
'failed-records.jsonl',
this.failedRecords.map(r => JSON.stringify(r)).join('\n')
)
}
}2. Progress Persistence
// Save checkpoint every 1000 records
if (this.processed % 1000 === 0) {
fs.writeFileSync('checkpoint.json', JSON.stringify({
processed: this.processed,
lastId: record.id,
timestamp: Date.now()
}))
}
// Resume from checkpoint
const checkpoint = JSON.parse(fs.readFileSync('checkpoint.json', 'utf-8'))
const resumeAfter = checkpoint.lastId3. Cost Tracking
class CostTracker {
private startTime = Date.now()
private gpuCostPerSecond = 0.00014 // RTX 4090: $0.50/hr
getCurrentCost(): number {
const runtime = (Date.now() - this.startTime) / 1000
return runtime * this.gpuCostPerSecond
}
getProjectedCost(totalRecords: number, processedRecords: number): number {
const current = this.getCurrentCost()
const ratio = totalRecords / processedRecords
return current * ratio
}
}Monitoring & Alerts
# Set up alerts for batch jobs
syaala notifications create \
--type slack \
--config '{"webhookUrl": "https://hooks.slack.com/..."}'
# Create alert for high error rate
syaala alerts create \
--deployment dep_abc123 \
--metric error_rate \
--threshold 0.05 \
--action notifyTroubleshooting
Slow Processing
# Check GPU utilization
syaala deployments metrics dep_abc123
# If GPU util < 70%: Increase batch size
# Edit batch-processor.ts: BATCH_SIZE = 512
# If GPU util > 95%: Reduce batch size or add GPUs
syaala deployments update dep_abc123 --max-replicas 5Out of Memory Errors
# Reduce batch size
BATCH_SIZE = 128
# Or use larger GPU
syaala deployments update dep_abc123 --gpu-type A100_40GBRate Limiting
# Reduce concurrency
MAX_CONCURRENT = 3
# Or upgrade plan
syaala billing upgrade --plan professionalCost Analysis
Processing 10,000 records:
| GPU Type | Throughput | Duration | Cost | Cost/1K |
|---|---|---|---|---|
| RTX 4090 | 400/sec | 25s | $0.0035 | $0.0004 |
| A10G | 350/sec | 28.6s | $0.0056 | $0.0006 |
| A100 40GB | 500/sec | 20s | $0.0061 | $0.0006 |
Recommendation: RTX 4090 for best cost/performance
Next Steps
- Chatbot Tutorial - Real-time streaming inference
- API Integration - Integrate with existing systems
- Cost Optimization - Reduce GPU costs
- Monitoring - Set up dashboards and alerts