การเรียก VLM หรือ OCR engine กินเวลา 2-30 วินาทีต่อภาพ ถ้าทำใน request handler ตรง ๆ event loop ของ FastAPI จะถูกบล็อก จำนวน connection เต็ม แล้วทั้งระบบล่ม ทางออกคือแยกงานหนักออกไปทำเบื้องหลัง ผ่าน message queue แล้วให้ API ตอบกลับทันที
ปัญหา: งานหนักบล็อก event loop
FastAPI รันบน async event loop ตัวเดียวต่อ worker process ถ้าคุณเรียกฟังก์ชัน OCR ที่เป็น CPU-bound หรือรอเครือข่ายนาน ๆ แบบ synchronous มันจะ “ขวาง” coroutine อื่นทั้งหมด แม้แต่ health check ก็ไม่ตอบ การยัดเข้า BackgroundTasks ก็ยังรันในโปรเซสเดียวกัน เครื่องจะแย่งทรัพยากรกับการรับ request
สถาปัตยกรรม: queue + worker
รูปแบบมาตรฐาน: API รับไฟล์ → เก็บลง object storage → push job เข้า queue → คืน job_id ทันที → worker pool ดึงงานไปทำ → อัปเดตสถานะลง DB → ฝั่ง client poll หรือรับ webhook สเกล worker แยกจาก API ได้อิสระตามความยาวของคิว
1Client ──POST /slips──▶ FastAPI ──enqueue──▶ Redis / RabbitMQ2 │ │3 คืน job_id worker pool (N ตัว)4 │ │ VLM / OCR5 GET /slips/{id} ◀──────┘ ▼6 (poll สถานะ) อัปเดตผลลง PostgreSQL + แจ้งเตือน- Redis + Celery/BullMQ - ตั้งง่าย latency ต่ำ เหมาะกับงานส่วนใหญ่
- RabbitMQ - เมื่อต้องการ routing ซับซ้อน, ack ที่เชื่อถือได้สูง, และ dead-letter exchange เป็นมาตรฐาน
FastAPI: รับงานแล้วคืนทันที
1import uuid2from fastapi import FastAPI, UploadFile, HTTPException3from .tasks import process_slip4from .storage import put_object5from .db import jobs67app = FastAPI()89@app.post("/slips", status_code=202)10async def upload_slip(file: UploadFile):11 if file.content_type not in {"image/jpeg", "image/png"}:12 raise HTTPException(415, "รองรับเฉพาะ JPEG/PNG")1314 job_id = str(uuid.uuid4())15 key = f"slips/{job_id}.bin"16 await put_object(key, await file.read()) # I/O แบบ async ไม่บล็อก17 await jobs.create(job_id, status="queued", key=key)1819 # ส่งงานเข้า Celery - ไม่รอผล คืน 202 ทันที20 process_slip.delay(job_id, key)21 return {"job_id": job_id, "status": "queued"}2223@app.get("/slips/{job_id}")24async def get_status(job_id: str):25 job = await jobs.get(job_id)26 if job is None:27 raise HTTPException(404, "ไม่พบงานนี้")28 return job # {status, result?, error?}Celery worker (Python)
1from celery import Celery2from .ocr import run_vlm3from .storage import get_object4from .db import jobs_sync # worker ใช้ client แบบ sync56celery = Celery(7 "slips",8 broker="redis://redis:6379/0",9 backend="redis://redis:6379/1",10)1112celery.conf.update(13 task_acks_late=True, # ack หลังทำเสร็จ กัน job หายถ้า worker ตาย14 worker_prefetch_multiplier=1, # งานหนัก: ดึงทีละชิ้น ไม่กักงานไว้15 task_reject_on_worker_lost=True,16 task_time_limit=120, # hard limit กัน worker ค้าง17 task_soft_time_limit=100,18)1920@celery.task(21 bind=True,22 autoretry_for=(ConnectionError, TimeoutError),23 retry_backoff=True, # exponential backoff อัตโนมัติ24 retry_kwargs={"max_retries": 3},25)26def process_slip(self, job_id: str, key: str):27 jobs_sync.update(job_id, status="processing")28 try:29 raw = get_object(key)30 result = run_vlm(raw) # งานหนักอยู่นี่ ใน worker ไม่ใช่ API31 jobs_sync.update(job_id, status="done", result=result)32 except Exception as exc:33 jobs_sync.update(job_id, status="failed", error=str(exc))34 raise1# รัน worker แยกจาก API process scale ตามความยาวคิว2# --concurrency = จำนวนงานพร้อมกันต่อ 1 worker process3celery -A app.tasks worker \4 --concurrency=4 \5 --max-tasks-per-child=50 \6 --loglevel=infoBullMQ worker (TypeScript)
ถ้า stack เป็น Node ฝั่งเดียว BullMQ (บน Redis) ให้แพทเทิร์นเดียวกัน: producer ใน API, worker process แยก
1import { Queue, Worker, type Job } from "bullmq";23const connection = { host: "redis", port: 6379 };45export const slipQueue = new Queue("slips", {6 connection,7 defaultJobOptions: {8 attempts: 3,9 backoff: { type: "exponential", delay: 2000 }, // 2s, 4s, 8s10 removeOnComplete: 1000,11 removeOnFail: 5000,12 },13});1415// ── ในไฟล์ worker แยก (รันคนละ process กับ API) ──16type SlipJob = { jobId: string; key: string };1718new Worker<SlipJob>(19 "slips",20 async (job: Job<SlipJob>) => {21 const raw = await getObject(job.data.key);22 const result = await runVlm(raw); // งานหนัก อยู่ใน worker23 await jobs.update(job.data.jobId, { status: "done", result });24 },25 { connection, concurrency: 4, lockDuration: 120_000 },26);Retry, idempotency, backpressure
- Idempotency - ใช้
job_idเป็น key งานที่ ถูกรันซ้ำต้องให้ผลเดิม เช็กสถานะก่อนทำงานจริงเสมอ - Exponential backoff - retry แบบเว้นช่วงเพิ่มขึ้น กัน thundering herd ใส่ jitter เล็กน้อยด้วย
- Dead-letter queue - งานที่ retry ครบแล้วยังพัง ส่งเข้า DLQ ให้คนตรวจ ไม่ใช่ทิ้งเงียบ ๆ
- Backpressure - จำกัดความยาวคิว/อัตรารับงาน ถ้าคิวยาวผิดปกติ ให้ API คืน 503 + Retry-After แทนรับงานจนระบบจม
1from fastapi import HTTPException2from fastapi.responses import Response34MAX_QUEUE_DEPTH = 500056async def guard_queue(redis):7 depth = await redis.llen("celery") # ความยาวคิวปัจจุบัน8 if depth > MAX_QUEUE_DEPTH:9 raise HTTPException(10 503,11 "ระบบกำลังประมวลผลงานจำนวนมาก กรุณาลองใหม่ในอีกสักครู่",12 headers={"Retry-After": "30"},13 )เช็กลิสต์ production
- API ต้องไม่ทำงานหนักเอง คืน 202 + job_id แล้วให้ worker ทำ
acks_late+ time limit + max-tasks-per-child เพื่อกันงานหายและ memory leak- ทุก job ต้อง idempotent และมี dead-letter queue สำหรับงานที่พังถาวร
- มี metric: ความยาวคิว, เวลารอเฉลี่ย, อัตรา fail แล้วตั้ง alert (ดูโมดูล MLOps)
- สเกล worker แนวนอนตามคิว ไม่ใช่เพิ่ม concurrency บน process เดียวจนหน่วยความจำหมด
สรุปสำคัญ
- API ต้องคืน 202 + job_id ทันที แล้วให้ worker ทำงานหนัก ไม่ทำเองใน request
- acks_late + time limit + max-tasks-per-child กันงานหายและ memory leak
- ทุก job ต้อง idempotent มี dead-letter queue และมี backpressure เมื่อคิวยาวผิดปกติ
ควิซท้ายบท
01ทำไมการเรียก OCR/VLM แบบ synchronous ตรง ๆ ใน request handler ของ FastAPI จึงเป็นปัญหา
02หลังรับไฟล์และ enqueue งานสำเร็จ API ควรตอบ HTTP status ใดและเพราะอะไร
03ตั้งค่า Celery ข้อใดช่วยกัน 'งานหาย' หาก worker ตายกลางคัน
04การตั้ง --max-tasks-per-child บน Celery worker มีประโยชน์หลักอย่างไรกับงาน VLM/OCR