Files
Proxy_for_codex/src/internal/queue/middleware.go
fedos 5118914823 Добавить приоритетную очередь, аутентификацию и администрирование
- Приоритетная очередь для контроля параллельных запросов
- Аутентификация по API-ключу из URL (/auth/<key>/v1/...)
- Роли пользователей с белым списком моделей и ограничением контекста (num_ctx)
- Sliding window rate limiting
- Admin API для горячей перезагрузки users.json без перезапуска прокси
- Graceful shutdown с таймаутом завершения активных запросов
- Маскировка API-ключа в логах
- Подробная инструкция по установке для Windows и Linux (SETUP_WIN_SERVER.md)
2026-03-28 15:17:40 +03:00

67 lines
2.3 KiB
Go

package queue
import (
"encoding/json"
"log/slog"
"net/http"
)
// PriorityFunc определяет приоритет запроса для постановки в очередь.
// Вызывается middleware перед Acquire. Возвращает целое число:
// чем больше значение — тем выше приоритет.
type PriorityFunc func(r *http.Request) int
// Middleware ограничивает количество параллельных запросов к бэкенду.
// Запросы, не получившие слот, ожидают в приоритетной очереди.
// priorityFn определяет приоритет каждого запроса; при nil приоритет = 1.
func Middleware(logger *slog.Logger, scheduler *Scheduler, priorityFn PriorityFunc, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
priority := 1
if priorityFn != nil {
priority = priorityFn(r)
}
waited, err := scheduler.Acquire(r.Context(), priority)
if err != nil {
handleAcquireError(logger, w, r, err, scheduler)
return
}
if waited {
inFlight, queued := scheduler.Stats()
logger.Info("запрос вышел из очереди",
"method", r.Method, "path", r.URL.Path,
"priority", priority, "in_flight", inFlight, "queued", queued,
)
}
defer scheduler.Release()
next.ServeHTTP(w, r)
})
}
// handleAcquireError формирует HTTP-ответ при ошибке получения слота.
func handleAcquireError(logger *slog.Logger, w http.ResponseWriter, r *http.Request, err error, scheduler *Scheduler) {
w.Header().Set("Content-Type", "application/json")
switch err {
case ErrQueueFull:
inFlight, queued := scheduler.Stats()
logger.Warn("очередь переполнена",
"method", r.Method, "path", r.URL.Path,
"in_flight", inFlight, "queued", queued,
)
w.WriteHeader(http.StatusServiceUnavailable)
json.NewEncoder(w).Encode(map[string]string{
"error": "server overloaded, queue full",
})
case ErrRequestCancelled:
logger.Debug("клиент отключился в очереди",
"method", r.Method, "path", r.URL.Path,
)
w.WriteHeader(http.StatusGatewayTimeout)
json.NewEncoder(w).Encode(map[string]string{
"error": "request cancelled while queued",
})
}
}