ข้ามไปยังเนื้อหา
vibe-coder/academy
หลักสูตรทั้งหมด
MODULE 03

Scaling OCR Pipelines

สถาปัตยกรรมที่ทนทานด้วย message queue (Redis/RabbitMQ) และ background worker แบบ async (Celery/BullMQ) เพื่อรองรับงาน OCR ที่กินทรัพยากร โดยไม่ทำให้ FastAPI ค้าง

  • FastAPI
  • Celery
  • BullMQ
  • Redis
  • RabbitMQ

การเรียก VLM หรือ OCR engine กินเวลา 2-30 วินาทีต่อภาพ ถ้าทำใน request handler ตรง ๆ event loop ของ FastAPI จะถูกบล็อก จำนวน connection เต็ม แล้วทั้งระบบล่ม ทางออกคือแยกงานหนักออกไปทำเบื้องหลัง ผ่าน message queue แล้วให้ API ตอบกลับทันที

ปัญหา: งานหนักบล็อก event loop

FastAPI รันบน async event loop ตัวเดียวต่อ worker process ถ้าคุณเรียกฟังก์ชัน OCR ที่เป็น CPU-bound หรือรอเครือข่ายนาน ๆ แบบ synchronous มันจะ “ขวาง” coroutine อื่นทั้งหมด แม้แต่ health check ก็ไม่ตอบ การยัดเข้า BackgroundTasks ก็ยังรันในโปรเซสเดียวกัน เครื่องจะแย่งทรัพยากรกับการรับ request

สถาปัตยกรรม: queue + worker

รูปแบบมาตรฐาน: API รับไฟล์ → เก็บลง object storage → push job เข้า queue → คืน job_id ทันที → worker pool ดึงงานไปทำ → อัปเดตสถานะลง DB → ฝั่ง client poll หรือรับ webhook สเกล worker แยกจาก API ได้อิสระตามความยาวของคิว

ของไหลในระบบ
Shell
1Client ──POST /slips──▶ FastAPI ──enqueue──▶ Redis / RabbitMQ
2 │ │
3 คืน job_id worker pool (N ตัว)
4 │ │ VLM / OCR
5 GET /slips/{id} ◀──────┘ ▼
6 (poll สถานะ) อัปเดตผลลง PostgreSQL + แจ้งเตือน
  • Redis + Celery/BullMQ - ตั้งง่าย latency ต่ำ เหมาะกับงานส่วนใหญ่
  • RabbitMQ - เมื่อต้องการ routing ซับซ้อน, ack ที่เชื่อถือได้สูง, และ dead-letter exchange เป็นมาตรฐาน

FastAPI: รับงานแล้วคืนทันที

api.py
Python
1import uuid
2from fastapi import FastAPI, UploadFile, HTTPException
3from .tasks import process_slip
4from .storage import put_object
5from .db import jobs
6
7app = FastAPI()
8
9@app.post("/slips", status_code=202)
10async def upload_slip(file: UploadFile):
11 if file.content_type not in {"image/jpeg", "image/png"}:
12 raise HTTPException(415, "รองรับเฉพาะ JPEG/PNG")
13
14 job_id = str(uuid.uuid4())
15 key = f"slips/{job_id}.bin"
16 await put_object(key, await file.read()) # I/O แบบ async ไม่บล็อก
17 await jobs.create(job_id, status="queued", key=key)
18
19 # ส่งงานเข้า Celery - ไม่รอผล คืน 202 ทันที
20 process_slip.delay(job_id, key)
21 return {"job_id": job_id, "status": "queued"}
22
23@app.get("/slips/{job_id}")
24async def get_status(job_id: str):
25 job = await jobs.get(job_id)
26 if job is None:
27 raise HTTPException(404, "ไม่พบงานนี้")
28 return job # {status, result?, error?}

Celery worker (Python)

tasks.py
Python
1from celery import Celery
2from .ocr import run_vlm
3from .storage import get_object
4from .db import jobs_sync # worker ใช้ client แบบ sync
5
6celery = Celery(
7 "slips",
8 broker="redis://redis:6379/0",
9 backend="redis://redis:6379/1",
10)
11
12celery.conf.update(
13 task_acks_late=True, # ack หลังทำเสร็จ กัน job หายถ้า worker ตาย
14 worker_prefetch_multiplier=1, # งานหนัก: ดึงทีละชิ้น ไม่กักงานไว้
15 task_reject_on_worker_lost=True,
16 task_time_limit=120, # hard limit กัน worker ค้าง
17 task_soft_time_limit=100,
18)
19
20@celery.task(
21 bind=True,
22 autoretry_for=(ConnectionError, TimeoutError),
23 retry_backoff=True, # exponential backoff อัตโนมัติ
24 retry_kwargs={"max_retries": 3},
25)
26def process_slip(self, job_id: str, key: str):
27 jobs_sync.update(job_id, status="processing")
28 try:
29 raw = get_object(key)
30 result = run_vlm(raw) # งานหนักอยู่นี่ ใน worker ไม่ใช่ API
31 jobs_sync.update(job_id, status="done", result=result)
32 except Exception as exc:
33 jobs_sync.update(job_id, status="failed", error=str(exc))
34 raise
run-worker.sh
Shell
1# รัน worker แยกจาก API process scale ตามความยาวคิว
2# --concurrency = จำนวนงานพร้อมกันต่อ 1 worker process
3celery -A app.tasks worker \
4 --concurrency=4 \
5 --max-tasks-per-child=50 \
6 --loglevel=info

BullMQ worker (TypeScript)

ถ้า stack เป็น Node ฝั่งเดียว BullMQ (บน Redis) ให้แพทเทิร์นเดียวกัน: producer ใน API, worker process แยก

queue.ts
TypeScript
1import { Queue, Worker, type Job } from "bullmq";
2
3const connection = { host: "redis", port: 6379 };
4
5export const slipQueue = new Queue("slips", {
6 connection,
7 defaultJobOptions: {
8 attempts: 3,
9 backoff: { type: "exponential", delay: 2000 }, // 2s, 4s, 8s
10 removeOnComplete: 1000,
11 removeOnFail: 5000,
12 },
13});
14
15// ── ในไฟล์ worker แยก (รันคนละ process กับ API) ──
16type SlipJob = { jobId: string; key: string };
17
18new Worker<SlipJob>(
19 "slips",
20 async (job: Job<SlipJob>) => {
21 const raw = await getObject(job.data.key);
22 const result = await runVlm(raw); // งานหนัก อยู่ใน worker
23 await jobs.update(job.data.jobId, { status: "done", result });
24 },
25 { connection, concurrency: 4, lockDuration: 120_000 },
26);

Retry, idempotency, backpressure

  • Idempotency - ใช้ job_id เป็น key งานที่ ถูกรันซ้ำต้องให้ผลเดิม เช็กสถานะก่อนทำงานจริงเสมอ
  • Exponential backoff - retry แบบเว้นช่วงเพิ่มขึ้น กัน thundering herd ใส่ jitter เล็กน้อยด้วย
  • Dead-letter queue - งานที่ retry ครบแล้วยังพัง ส่งเข้า DLQ ให้คนตรวจ ไม่ใช่ทิ้งเงียบ ๆ
  • Backpressure - จำกัดความยาวคิว/อัตรารับงาน ถ้าคิวยาวผิดปกติ ให้ API คืน 503 + Retry-After แทนรับงานจนระบบจม
backpressure.py
Python
1from fastapi import HTTPException
2from fastapi.responses import Response
3
4MAX_QUEUE_DEPTH = 5000
5
6async def guard_queue(redis):
7 depth = await redis.llen("celery") # ความยาวคิวปัจจุบัน
8 if depth > MAX_QUEUE_DEPTH:
9 raise HTTPException(
10 503,
11 "ระบบกำลังประมวลผลงานจำนวนมาก กรุณาลองใหม่ในอีกสักครู่",
12 headers={"Retry-After": "30"},
13 )

เช็กลิสต์ production

  • API ต้องไม่ทำงานหนักเอง คืน 202 + job_id แล้วให้ worker ทำ
  • acks_late + time limit + max-tasks-per-child เพื่อกันงานหายและ memory leak
  • ทุก job ต้อง idempotent และมี dead-letter queue สำหรับงานที่พังถาวร
  • มี metric: ความยาวคิว, เวลารอเฉลี่ย, อัตรา fail แล้วตั้ง alert (ดูโมดูล MLOps)
  • สเกล worker แนวนอนตามคิว ไม่ใช่เพิ่ม concurrency บน process เดียวจนหน่วยความจำหมด

สรุปสำคัญ

  • API ต้องคืน 202 + job_id ทันที แล้วให้ worker ทำงานหนัก ไม่ทำเองใน request
  • acks_late + time limit + max-tasks-per-child กันงานหายและ memory leak
  • ทุก job ต้อง idempotent มี dead-letter queue และมี backpressure เมื่อคิวยาวผิดปกติ
ทดสอบความเข้าใจ

ควิซท้ายบท

0/4 ข้อ
  1. 01ทำไมการเรียก OCR/VLM แบบ synchronous ตรง ๆ ใน request handler ของ FastAPI จึงเป็นปัญหา

  2. 02หลังรับไฟล์และ enqueue งานสำเร็จ API ควรตอบ HTTP status ใดและเพราะอะไร

  3. 03ตั้งค่า Celery ข้อใดช่วยกัน 'งานหาย' หาก worker ตายกลางคัน

  4. 04การตั้ง --max-tasks-per-child บน Celery worker มีประโยชน์หลักอย่างไรกับงาน VLM/OCR

ตอบให้ครบทุกข้อแล้วกดส่งคำตอบเพื่อดูเฉลย