forked from templates/template-go-orm
Go-сервис-прокси между Codex CLI и Ollama. Добавляет Bearer-авторизацию, LLM-маршрутизатор (deepseek классифицирует запросы: code/doc/general), поддержку OpenAI Responses API для Codex CLI, стриминг SSE, кеш модели. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
339 lines
10 KiB
Go
339 lines
10 KiB
Go
package service
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"math/rand"
|
||
"net/http"
|
||
"strings"
|
||
|
||
"ai-platform/internal/model"
|
||
)
|
||
|
||
// OllamaClient — HTTP-клиент к реальной Ollama
|
||
type OllamaClient struct {
|
||
baseURL string
|
||
numCtx int
|
||
client *http.Client
|
||
}
|
||
|
||
func NewOllamaClient(baseURL string, numCtx int) *OllamaClient {
|
||
return &OllamaClient{
|
||
baseURL: baseURL,
|
||
numCtx: numCtx,
|
||
client: &http.Client{},
|
||
}
|
||
}
|
||
|
||
// withOptions добавляет options.num_ctx к ChatRequest, если настроено
|
||
func (c *OllamaClient) withOptions(req *model.ChatRequest) {
|
||
if c.numCtx > 0 {
|
||
req.Options = &model.OllamaOptions{NumCtx: c.numCtx}
|
||
}
|
||
}
|
||
|
||
// ProxyChat — стриминг /api/chat: отправляет запрос в Ollama и построчно
|
||
// пишет NDJSON-ответ в ResponseWriter с Flush после каждой строки.
|
||
func (c *OllamaClient) ProxyChat(ctx context.Context, w http.ResponseWriter, req model.ChatRequest) error {
|
||
c.withOptions(&req)
|
||
return c.proxyStream(ctx, w, "/api/chat", req)
|
||
}
|
||
|
||
// ProxyGenerate — стриминг /api/generate: аналогично ProxyChat.
|
||
func (c *OllamaClient) ProxyGenerate(ctx context.Context, w http.ResponseWriter, req model.GenerateRequest) error {
|
||
return c.proxyStream(ctx, w, "/api/generate", req)
|
||
}
|
||
|
||
// proxyStream — общая логика стриминга NDJSON от Ollama к клиенту.
|
||
func (c *OllamaClient) proxyStream(ctx context.Context, w http.ResponseWriter, path string, body any) error {
|
||
jsonBody, err := json.Marshal(body)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal request: %w", err)
|
||
}
|
||
|
||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(jsonBody))
|
||
if err != nil {
|
||
return fmt.Errorf("create request: %w", err)
|
||
}
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := c.client.Do(httpReq)
|
||
if err != nil {
|
||
return fmt.Errorf("ollama request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
respBody, _ := io.ReadAll(resp.Body)
|
||
return fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(respBody))
|
||
}
|
||
|
||
// Копируем Content-Type из ответа Ollama (NDJSON или SSE для /v1/)
|
||
if ct := resp.Header.Get("Content-Type"); ct != "" {
|
||
w.Header().Set("Content-Type", ct)
|
||
} else {
|
||
w.Header().Set("Content-Type", "application/x-ndjson")
|
||
}
|
||
w.WriteHeader(http.StatusOK)
|
||
|
||
flusher, ok := w.(http.Flusher)
|
||
if !ok {
|
||
return fmt.Errorf("response writer does not support flushing")
|
||
}
|
||
|
||
scanner := bufio.NewScanner(resp.Body)
|
||
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // буфер 1MB для длинных строк
|
||
|
||
for scanner.Scan() {
|
||
w.Write(scanner.Bytes())
|
||
w.Write([]byte("\n"))
|
||
flusher.Flush()
|
||
}
|
||
|
||
return scanner.Err()
|
||
}
|
||
|
||
// ProxyChatV1 — стриминг /v1/chat/completions (OpenAI-совместимый формат).
|
||
func (c *OllamaClient) ProxyChatV1(ctx context.Context, w http.ResponseWriter, req model.ChatRequest) error {
|
||
return c.proxyStream(ctx, w, "/v1/chat/completions", req)
|
||
}
|
||
|
||
// ProxyResponsesV1 — обработка /responses и /v1/responses (Codex CLI).
|
||
// Конвертирует Ollama NDJSON-стрим в OpenAI Responses API SSE-формат.
|
||
func (c *OllamaClient) ProxyResponsesV1(ctx context.Context, w http.ResponseWriter, messages []model.Message, modelName string) error {
|
||
streamTrue := true
|
||
req := model.ChatRequest{
|
||
Model: modelName,
|
||
Messages: messages,
|
||
Stream: &streamTrue,
|
||
}
|
||
c.withOptions(&req)
|
||
|
||
jsonBody, err := json.Marshal(req)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal request: %w", err)
|
||
}
|
||
|
||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(jsonBody))
|
||
if err != nil {
|
||
return fmt.Errorf("create request: %w", err)
|
||
}
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := c.client.Do(httpReq)
|
||
if err != nil {
|
||
return fmt.Errorf("ollama request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
body, _ := io.ReadAll(resp.Body)
|
||
return fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(body))
|
||
}
|
||
|
||
// Уникальные ID для ответа
|
||
responseID := fmt.Sprintf("resp_%016x", rand.Int63())
|
||
itemID := fmt.Sprintf("msg_%016x", rand.Int63())
|
||
|
||
// SSE-заголовки
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.WriteHeader(http.StatusOK)
|
||
|
||
flusher, ok := w.(http.Flusher)
|
||
if !ok {
|
||
return fmt.Errorf("response writer does not support flushing")
|
||
}
|
||
|
||
writeSSE := func(eventType string, data map[string]any) {
|
||
jsonData, _ := json.Marshal(data)
|
||
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData)
|
||
flusher.Flush()
|
||
}
|
||
|
||
// Начальные события
|
||
writeSSE("response.created", map[string]any{
|
||
"type": "response.created",
|
||
"response": map[string]any{
|
||
"id": responseID, "object": "response",
|
||
"status": "in_progress", "model": modelName, "output": []any{},
|
||
},
|
||
})
|
||
writeSSE("response.output_item.added", map[string]any{
|
||
"type": "response.output_item.added", "output_index": 0,
|
||
"item": map[string]any{
|
||
"id": itemID, "type": "message",
|
||
"status": "in_progress", "role": "assistant", "content": []any{},
|
||
},
|
||
})
|
||
writeSSE("response.content_part.added", map[string]any{
|
||
"type": "response.content_part.added",
|
||
"item_id": itemID, "output_index": 0, "content_index": 0,
|
||
"part": map[string]any{"type": "output_text", "text": "", "annotations": []any{}},
|
||
})
|
||
|
||
// Читаем NDJSON-стрим от Ollama и конвертируем в SSE
|
||
var fullText strings.Builder
|
||
scanner := bufio.NewScanner(resp.Body)
|
||
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
|
||
|
||
for scanner.Scan() {
|
||
var chunk model.ChatResponse
|
||
if err := json.Unmarshal(scanner.Bytes(), &chunk); err != nil {
|
||
continue
|
||
}
|
||
|
||
if delta := chunk.Message.Content; delta != "" {
|
||
fullText.WriteString(delta)
|
||
writeSSE("response.output_text.delta", map[string]any{
|
||
"type": "response.output_text.delta",
|
||
"item_id": itemID,
|
||
"output_index": 0,
|
||
"content_index": 0,
|
||
"delta": delta,
|
||
})
|
||
}
|
||
|
||
if chunk.Done {
|
||
text := fullText.String()
|
||
writeSSE("response.output_text.done", map[string]any{
|
||
"type": "response.output_text.done",
|
||
"item_id": itemID, "output_index": 0, "content_index": 0,
|
||
"text": text,
|
||
})
|
||
writeSSE("response.output_item.done", map[string]any{
|
||
"type": "response.output_item.done", "output_index": 0,
|
||
"item": map[string]any{
|
||
"id": itemID, "type": "message", "status": "completed",
|
||
"role": "assistant",
|
||
"content": []any{map[string]any{
|
||
"type": "output_text", "text": text, "annotations": []any{},
|
||
}},
|
||
},
|
||
})
|
||
writeSSE("response.completed", map[string]any{
|
||
"type": "response.completed",
|
||
"response": map[string]any{
|
||
"id": responseID, "object": "response",
|
||
"status": "completed", "model": modelName,
|
||
"output": []any{map[string]any{
|
||
"id": itemID, "type": "message", "status": "completed",
|
||
"role": "assistant",
|
||
"content": []any{map[string]any{
|
||
"type": "output_text", "text": text, "annotations": []any{},
|
||
}},
|
||
}},
|
||
},
|
||
})
|
||
}
|
||
}
|
||
|
||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||
flusher.Flush()
|
||
|
||
return scanner.Err()
|
||
}
|
||
|
||
// GetModelsV1 — GET /v1/models, возвращает список моделей в OpenAI-формате.
|
||
func (c *OllamaClient) GetModelsV1(ctx context.Context) ([]byte, error) {
|
||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/v1/models", nil)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("create request: %w", err)
|
||
}
|
||
|
||
resp, err := c.client.Do(httpReq)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("ollama request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("read response: %w", err)
|
||
}
|
||
return body, nil
|
||
}
|
||
|
||
// GetTags — GET /api/tags, возвращает список моделей из Ollama.
|
||
func (c *OllamaClient) GetTags(ctx context.Context) (*model.TagsResponse, error) {
|
||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("create request: %w", err)
|
||
}
|
||
|
||
resp, err := c.client.Do(httpReq)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("ollama request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
respBody, _ := io.ReadAll(resp.Body)
|
||
return nil, fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(respBody))
|
||
}
|
||
|
||
var tags model.TagsResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&tags); err != nil {
|
||
return nil, fmt.Errorf("decode tags: %w", err)
|
||
}
|
||
|
||
return &tags, nil
|
||
}
|
||
|
||
// Complete — синхронный вызов /api/chat (stream=false).
|
||
// Используется Router LLM для классификации запроса.
|
||
func (c *OllamaClient) Complete(ctx context.Context, modelName string, prompt string) (string, error) {
|
||
return c.CompleteWithSystem(ctx, modelName, "", prompt)
|
||
}
|
||
|
||
// CompleteWithSystem — синхронный вызов с отдельным system-промптом.
|
||
func (c *OllamaClient) CompleteWithSystem(ctx context.Context, modelName, system, userMsg string) (string, error) {
|
||
streamFalse := false
|
||
messages := []model.Message{}
|
||
if system != "" {
|
||
messages = append(messages, model.Message{Role: "system", Content: system})
|
||
}
|
||
messages = append(messages, model.Message{Role: "user", Content: userMsg})
|
||
|
||
req := model.ChatRequest{
|
||
Model: modelName,
|
||
Messages: messages,
|
||
Stream: &streamFalse,
|
||
}
|
||
c.withOptions(&req)
|
||
|
||
jsonBody, err := json.Marshal(req)
|
||
|
||
if err != nil {
|
||
return "", fmt.Errorf("marshal request: %w", err)
|
||
}
|
||
|
||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(jsonBody))
|
||
if err != nil {
|
||
return "", fmt.Errorf("create request: %w", err)
|
||
}
|
||
httpReq.Header.Set("Content-Type", "application/json")
|
||
|
||
resp, err := c.client.Do(httpReq)
|
||
if err != nil {
|
||
return "", fmt.Errorf("ollama request: %w", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
respBody, _ := io.ReadAll(resp.Body)
|
||
return "", fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(respBody))
|
||
}
|
||
|
||
var chatResp model.ChatResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil {
|
||
return "", fmt.Errorf("decode response: %w", err)
|
||
}
|
||
|
||
return chatResp.Message.Content, nil
|
||
}
|