ExamplesBatch Inference Processing

Batch Inference Processing

Process thousands of inference requests efficiently using batch processing and parallel execution on GPU infrastructure.

Production Use Case: Content moderation, document classification, bulk translation, sentiment analysis at scale.

What You’ll Build

  • Input: CSV/JSONL file with thousands of requests
  • Processing: Parallel batch inference on multiple GPUs
  • Output: Results written to file with status tracking
  • Cost: ~$0.10 per 10,000 requests using RTX 4090

Use Cases

  • Content Moderation: Classify 100K posts/day
  • Document Processing: Extract insights from PDFs
  • Sentiment Analysis: Analyze customer reviews
  • Translation: Bulk translate product descriptions

Architecture

Input File (JSONL)

Batch Job Manager

Split into chunks (256 requests each)

Parallel processing on GPUs (up to 10 workers)

Aggregate results

Output File (JSONL) + Progress tracking

Step 1: Prepare Input Data

Create input.jsonl:

{"id": "1", "text": "This product is amazing!"}
{"id": "2", "text": "Terrible customer service"}
{"id": "3", "text": "Average quality, okay price"}

Generate Test Data

# Generate 10,000 test records
node -e "
const fs = require('fs');
const stream = fs.createWriteStream('input.jsonl');
for (let i = 1; i <= 10000; i++) {
  stream.write(JSON.stringify({
    id: String(i),
    text: \`Sample text for classification \${i}\`
  }) + '\n');
}
stream.end();
"

Step 2: Deploy Classification Model

Using a template for instant deployment:

# Use pre-configured template for sentiment analysis
syaala templates deploy \
  --template sentiment-classification \
  --name batch-classifier \
  --min-replicas 2 \
  --max-replicas 10

Or deploy custom model:

syaala deployments create \
  --name batch-classifier \
  --model cardiffnlp/twitter-roberta-base-sentiment-latest \
  --runtime vllm \
  --gpu-type RTX_4090 \
  --gpu-count 1 \
  --min-replicas 2 \
  --max-replicas 10 \
  --env MAX_MODEL_LEN=512 \
  --env MAX_BATCH_SIZE=256

Step 3: Create Batch Processor

Create batch-processor.ts:

import { createClient } from '@syaala/sdk'
import fs from 'fs'
import readline from 'readline'
import { Transform } from 'stream'
 
// Configuration
const DEPLOYMENT_ID = process.env.DEPLOYMENT_ID!
const BATCH_SIZE = 256  // Requests per batch
const MAX_CONCURRENT = 5 // Parallel batches
const RETRY_ATTEMPTS = 3
 
const client = createClient(process.env.SYAALA_API_KEY!)
 
interface InputRecord {
  id: string
  text: string
}
 
interface OutputRecord {
  id: string
  text: string
  sentiment: 'positive' | 'negative' | 'neutral'
  confidence: number
  processingTime: number
  error?: string
}
 
class BatchProcessor {
  private processed = 0
  private errors = 0
  private startTime = Date.now()
  
  async processFile(inputPath: string, outputPath: string) {
    console.log(`Starting batch processing: ${inputPath}`)
    console.log(`Deployment: ${DEPLOYMENT_ID}`)
    console.log(`Batch size: ${BATCH_SIZE}, Concurrency: ${MAX_CONCURRENT}\n`)
    
    const inputStream = fs.createReadStream(inputPath)
    const outputStream = fs.createWriteStream(outputPath)
    const rl = readline.createInterface({ input: inputStream })
    
    let batch: InputRecord[] = []
    const batchPromises: Promise<void>[] = []
    
    for await (const line of rl) {
      if (!line.trim()) continue
      
      const record = JSON.parse(line) as InputRecord
      batch.push(record)
      
      if (batch.length >= BATCH_SIZE) {
        const currentBatch = [...batch]
        batch = []
        
        // Wait if we have too many concurrent batches
        if (batchPromises.length >= MAX_CONCURRENT) {
          await Promise.race(batchPromises)
          batchPromises.splice(0, 1)
        }
        
        // Process batch
        const promise = this.processBatch(currentBatch, outputStream)
        batchPromises.push(promise)
      }
    }
    
    // Process remaining records
    if (batch.length > 0) {
      await this.processBatch(batch, outputStream)
    }
    
    // Wait for all batches to complete
    await Promise.all(batchPromises)
    
    outputStream.end()
    
    const duration = (Date.now() - this.startTime) / 1000
    console.log(`\n✅ Batch processing complete!`)
    console.log(`   Total: ${this.processed} records`)
    console.log(`   Errors: ${this.errors}`)
    console.log(`   Duration: ${duration.toFixed(2)}s`)
    console.log(`   Throughput: ${(this.processed / duration).toFixed(2)} records/sec`)
  }
  
  private async processBatch(
    batch: InputRecord[],
    outputStream: fs.WriteStream
  ): Promise<void> {
    const batchStartTime = Date.now()
    
    try {
      // Create parallel inference requests
      const requests = batch.map(record => ({
        prompt: `Classify sentiment: "${record.text}"\nSentiment:`,
        maxTokens: 10,
        temperature: 0.1,
        stop: ['\n']
      }))
      
      // Use SDK batch inference with retries
      const result = await this.retryOperation(() => 
        client.inference.batch(DEPLOYMENT_ID, requests, {
          parallel: true,
          maxConcurrency: BATCH_SIZE
        })
      )
      
      if (!result.success) {
        throw new Error(result.error)
      }
      
      // Process results
      const processingTime = Date.now() - batchStartTime
      
      for (let i = 0; i < batch.length; i++) {
        const record = batch[i]
        const response = result.data[i]
        
        const output: OutputRecord = {
          id: record.id,
          text: record.text,
          sentiment: this.parseSentiment(response.text),
          confidence: 0.95, // Would come from model in production
          processingTime: processingTime / batch.length
        }
        
        outputStream.write(JSON.stringify(output) + '\n')
        this.processed++
      }
      
      // Progress update
      this.printProgress()
      
    } catch (error) {
      console.error(`\nBatch error:`, error)
      
      // Write error records
      for (const record of batch) {
        const output: OutputRecord = {
          id: record.id,
          text: record.text,
          sentiment: 'neutral',
          confidence: 0,
          processingTime: 0,
          error: error instanceof Error ? error.message : 'Unknown error'
        }
        outputStream.write(JSON.stringify(output) + '\n')
        this.errors++
      }
    }
  }
  
  private async retryOperation<T>(
    operation: () => Promise<T>,
    attempts = RETRY_ATTEMPTS
  ): Promise<T> {
    for (let i = 0; i < attempts; i++) {
      try {
        return await operation()
      } catch (error) {
        if (i === attempts - 1) throw error
        await new Promise(resolve => setTimeout(resolve, Math.pow(2, i) * 1000))
      }
    }
    throw new Error('Max retries exceeded')
  }
  
  private parseSentiment(text: string): 'positive' | 'negative' | 'neutral' {
    const lower = text.toLowerCase().trim()
    if (lower.includes('positive')) return 'positive'
    if (lower.includes('negative')) return 'negative'
    return 'neutral'
  }
  
  private printProgress() {
    const duration = (Date.now() - this.startTime) / 1000
    const throughput = this.processed / duration
    process.stdout.write(
      `\r  Processed: ${this.processed} | ` +
      `Errors: ${this.errors} | ` +
      `Throughput: ${throughput.toFixed(2)}/sec`
    )
  }
}
 
// Run batch processor
async function main() {
  const processor = new BatchProcessor()
  await processor.processFile('input.jsonl', 'output.jsonl')
}
 
main().catch(console.error)

Step 4: Run Batch Processing

# Set environment variables
export SYAALA_API_KEY=sk_live_...
export DEPLOYMENT_ID=dep_...
 
# Install dependencies
npm install @syaala/sdk
 
# Run processor
npx tsx batch-processor.ts

Output:

Starting batch processing: input.jsonl
Deployment: dep_abc123
Batch size: 256, Concurrency: 5

  Processed: 10000 | Errors: 0 | Throughput: 423.18/sec

✅ Batch processing complete!
   Total: 10000 records
   Errors: 0
   Duration: 23.64s
   Throughput: 423.18 records/sec

Step 5: Analyze Results

# View sample results
head -n 5 output.jsonl
{"id":"1","text":"This product is amazing!","sentiment":"positive","confidence":0.95,"processingTime":2.3}
{"id":"2","text":"Terrible customer service","sentiment":"negative","confidence":0.92,"processingTime":2.3}
{"id":"3","text":"Average quality, okay price","sentiment":"neutral","confidence":0.88,"processingTime":2.3}

Calculate Statistics

import fs from 'fs'
import readline from 'readline'
 
async function analyzeResults(filepath: string) {
  const rl = readline.createInterface({
    input: fs.createReadStream(filepath)
  })
  
  let total = 0
  let positive = 0
  let negative = 0
  let neutral = 0
  let errors = 0
  let totalTime = 0
  
  for await (const line of rl) {
    const record = JSON.parse(line)
    total++
    
    if (record.error) {
      errors++
      continue
    }
    
    switch (record.sentiment) {
      case 'positive': positive++; break
      case 'negative': negative++; break
      case 'neutral': neutral++; break
    }
    
    totalTime += record.processingTime
  }
  
  console.log('Results Analysis:')
  console.log(`  Total: ${total}`)
  console.log(`  Positive: ${positive} (${(positive/total*100).toFixed(1)}%)`)
  console.log(`  Negative: ${negative} (${(negative/total*100).toFixed(1)}%)`)
  console.log(`  Neutral: ${neutral} (${(neutral/total*100).toFixed(1)}%)`)
  console.log(`  Errors: ${errors}`)
  console.log(`  Avg Time: ${(totalTime/total).toFixed(2)}ms`)
}
 
analyzeResults('output.jsonl')

Using CLI Batch Command

Easier alternative using built-in CLI:

syaala batch create \
  --deployment dep_abc123 \
  --input input.jsonl \
  --output output.jsonl \
  --batch-size 256 \
  --concurrency 5

Monitor progress:

syaala batch status <job-id>

Advanced: Multi-GPU Scaling

For very large datasets (millions of records):

// Deploy multiple workers
const workers = await Promise.all([
  client.deployments.create(orgId, { name: 'worker-1', ... }),
  client.deployments.create(orgId, { name: 'worker-2', ... }),
  client.deployments.create(orgId, { name: 'worker-3', ... }),
])
 
// Distribute work across workers
const workerIds = workers.map(w => w.data.id)
let currentWorker = 0
 
function getNextWorker() {
  const id = workerIds[currentWorker]
  currentWorker = (currentWorker + 1) % workerIds.length
  return id
}
 
// Process batches on different workers
await this.processBatch(batch, outputStream, getNextWorker())

Cost Optimization

Strategy 1: Right-size GPU

# Small models (<7B params): Use RTX 4090
syaala deployments update dep_abc123 --gpu-type RTX_4090
 
# Medium models (7-13B): Use A10G
syaala deployments update dep_abc123 --gpu-type A10G
 
# Large models (>13B): Use A100
syaala deployments update dep_abc123 --gpu-type A100_40GB

Strategy 2: Optimize Batch Size

// GPU Memory = 24GB (RTX 4090)
// Model Size = 7B params × 2 bytes (FP16) = 14GB
// Available for batching = 10GB
// Batch size = 10GB / (sequence_length × hidden_size)
 
const optimalBatchSize = calculateBatchSize({
  gpuMemory: 24,      // GB
  modelSize: 14,      // GB
  sequenceLength: 512,
  hiddenSize: 4096
})
// Result: ~256 requests per batch

Strategy 3: Use Spot Instances

syaala deployments create \
  --name batch-spot \
  --gpu-type RTX_4090 \
  --spot true \
  --spot-max-price 0.30  # Max $0.30/hr (70% savings)

Production Deployment

1. Error Handling & Retries

class RobustBatchProcessor extends BatchProcessor {
  private failedRecords: InputRecord[] = []
  
  async processWithRetry(record: InputRecord, retries = 3): Promise<OutputRecord> {
    for (let i = 0; i < retries; i++) {
      try {
        return await this.processRecord(record)
      } catch (error) {
        if (i === retries - 1) {
          this.failedRecords.push(record)
          throw error
        }
        await new Promise(r => setTimeout(r, Math.pow(2, i) * 1000))
      }
    }
    throw new Error('Unreachable')
  }
  
  async saveFailedRecords() {
    fs.writeFileSync(
      'failed-records.jsonl',
      this.failedRecords.map(r => JSON.stringify(r)).join('\n')
    )
  }
}

2. Progress Persistence

// Save checkpoint every 1000 records
if (this.processed % 1000 === 0) {
  fs.writeFileSync('checkpoint.json', JSON.stringify({
    processed: this.processed,
    lastId: record.id,
    timestamp: Date.now()
  }))
}
 
// Resume from checkpoint
const checkpoint = JSON.parse(fs.readFileSync('checkpoint.json', 'utf-8'))
const resumeAfter = checkpoint.lastId

3. Cost Tracking

class CostTracker {
  private startTime = Date.now()
  private gpuCostPerSecond = 0.00014 // RTX 4090: $0.50/hr
  
  getCurrentCost(): number {
    const runtime = (Date.now() - this.startTime) / 1000
    return runtime * this.gpuCostPerSecond
  }
  
  getProjectedCost(totalRecords: number, processedRecords: number): number {
    const current = this.getCurrentCost()
    const ratio = totalRecords / processedRecords
    return current * ratio
  }
}

Monitoring & Alerts

# Set up alerts for batch jobs
syaala notifications create \
  --type slack \
  --config '{"webhookUrl": "https://hooks.slack.com/..."}'
 
# Create alert for high error rate
syaala alerts create \
  --deployment dep_abc123 \
  --metric error_rate \
  --threshold 0.05 \
  --action notify

Troubleshooting

Slow Processing

# Check GPU utilization
syaala deployments metrics dep_abc123
 
# If GPU util < 70%: Increase batch size
# Edit batch-processor.ts: BATCH_SIZE = 512
 
# If GPU util > 95%: Reduce batch size or add GPUs
syaala deployments update dep_abc123 --max-replicas 5

Out of Memory Errors

# Reduce batch size
BATCH_SIZE = 128
 
# Or use larger GPU
syaala deployments update dep_abc123 --gpu-type A100_40GB

Rate Limiting

# Reduce concurrency
MAX_CONCURRENT = 3
 
# Or upgrade plan
syaala billing upgrade --plan professional

Cost Analysis

Processing 10,000 records:

GPU TypeThroughputDurationCostCost/1K
RTX 4090400/sec25s$0.0035$0.0004
A10G350/sec28.6s$0.0056$0.0006
A100 40GB500/sec20s$0.0061$0.0006

Recommendation: RTX 4090 for best cost/performance

Next Steps