package service import ( "bufio" "bytes" "context" "encoding/json" "fmt" "io" "math/rand" "net/http" "strings" "ai-platform/internal/model" ) // OllamaClient — HTTP-клиент к реальной Ollama type OllamaClient struct { baseURL string numCtx int client *http.Client } func NewOllamaClient(baseURL string, numCtx int) *OllamaClient { return &OllamaClient{ baseURL: baseURL, numCtx: numCtx, client: &http.Client{}, } } // withOptions добавляет options.num_ctx к ChatRequest, если настроено func (c *OllamaClient) withOptions(req *model.ChatRequest) { if c.numCtx > 0 { req.Options = &model.OllamaOptions{NumCtx: c.numCtx} } } // ProxyChat — стриминг /api/chat: отправляет запрос в Ollama и построчно // пишет NDJSON-ответ в ResponseWriter с Flush после каждой строки. func (c *OllamaClient) ProxyChat(ctx context.Context, w http.ResponseWriter, req model.ChatRequest) error { c.withOptions(&req) return c.proxyStream(ctx, w, "/api/chat", req) } // ProxyGenerate — стриминг /api/generate: аналогично ProxyChat. func (c *OllamaClient) ProxyGenerate(ctx context.Context, w http.ResponseWriter, req model.GenerateRequest) error { return c.proxyStream(ctx, w, "/api/generate", req) } // proxyStream — общая логика стриминга NDJSON от Ollama к клиенту. func (c *OllamaClient) proxyStream(ctx context.Context, w http.ResponseWriter, path string, body any) error { jsonBody, err := json.Marshal(body) if err != nil { return fmt.Errorf("marshal request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(jsonBody)) if err != nil { return fmt.Errorf("create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(httpReq) if err != nil { return fmt.Errorf("ollama request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) return fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(respBody)) } // Копируем Content-Type из ответа Ollama (NDJSON или SSE для /v1/) if ct := resp.Header.Get("Content-Type"); ct != "" { w.Header().Set("Content-Type", ct) } else { w.Header().Set("Content-Type", "application/x-ndjson") } w.WriteHeader(http.StatusOK) flusher, ok := w.(http.Flusher) if !ok { return fmt.Errorf("response writer does not support flushing") } scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // буфер 1MB для длинных строк for scanner.Scan() { w.Write(scanner.Bytes()) w.Write([]byte("\n")) flusher.Flush() } return scanner.Err() } // ProxyChatV1 — стриминг /v1/chat/completions (OpenAI-совместимый формат). func (c *OllamaClient) ProxyChatV1(ctx context.Context, w http.ResponseWriter, req model.ChatRequest) error { return c.proxyStream(ctx, w, "/v1/chat/completions", req) } // ProxyResponsesV1 — обработка /responses и /v1/responses (Codex CLI). // Конвертирует Ollama NDJSON-стрим в OpenAI Responses API SSE-формат. func (c *OllamaClient) ProxyResponsesV1(ctx context.Context, w http.ResponseWriter, messages []model.Message, modelName string) error { streamTrue := true req := model.ChatRequest{ Model: modelName, Messages: messages, Stream: &streamTrue, } c.withOptions(&req) jsonBody, err := json.Marshal(req) if err != nil { return fmt.Errorf("marshal request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(jsonBody)) if err != nil { return fmt.Errorf("create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(httpReq) if err != nil { return fmt.Errorf("ollama request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(body)) } // Уникальные ID для ответа responseID := fmt.Sprintf("resp_%016x", rand.Int63()) itemID := fmt.Sprintf("msg_%016x", rand.Int63()) // SSE-заголовки w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.WriteHeader(http.StatusOK) flusher, ok := w.(http.Flusher) if !ok { return fmt.Errorf("response writer does not support flushing") } writeSSE := func(eventType string, data map[string]any) { jsonData, _ := json.Marshal(data) fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, jsonData) flusher.Flush() } // Начальные события writeSSE("response.created", map[string]any{ "type": "response.created", "response": map[string]any{ "id": responseID, "object": "response", "status": "in_progress", "model": modelName, "output": []any{}, }, }) writeSSE("response.output_item.added", map[string]any{ "type": "response.output_item.added", "output_index": 0, "item": map[string]any{ "id": itemID, "type": "message", "status": "in_progress", "role": "assistant", "content": []any{}, }, }) writeSSE("response.content_part.added", map[string]any{ "type": "response.content_part.added", "item_id": itemID, "output_index": 0, "content_index": 0, "part": map[string]any{"type": "output_text", "text": "", "annotations": []any{}}, }) // Читаем NDJSON-стрим от Ollama и конвертируем в SSE var fullText strings.Builder scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 1024*1024), 1024*1024) for scanner.Scan() { var chunk model.ChatResponse if err := json.Unmarshal(scanner.Bytes(), &chunk); err != nil { continue } if delta := chunk.Message.Content; delta != "" { fullText.WriteString(delta) writeSSE("response.output_text.delta", map[string]any{ "type": "response.output_text.delta", "item_id": itemID, "output_index": 0, "content_index": 0, "delta": delta, }) } if chunk.Done { text := fullText.String() writeSSE("response.output_text.done", map[string]any{ "type": "response.output_text.done", "item_id": itemID, "output_index": 0, "content_index": 0, "text": text, }) writeSSE("response.output_item.done", map[string]any{ "type": "response.output_item.done", "output_index": 0, "item": map[string]any{ "id": itemID, "type": "message", "status": "completed", "role": "assistant", "content": []any{map[string]any{ "type": "output_text", "text": text, "annotations": []any{}, }}, }, }) writeSSE("response.completed", map[string]any{ "type": "response.completed", "response": map[string]any{ "id": responseID, "object": "response", "status": "completed", "model": modelName, "output": []any{map[string]any{ "id": itemID, "type": "message", "status": "completed", "role": "assistant", "content": []any{map[string]any{ "type": "output_text", "text": text, "annotations": []any{}, }}, }}, }, }) } } fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() return scanner.Err() } // GetModelsV1 — GET /v1/models, возвращает список моделей в OpenAI-формате. func (c *OllamaClient) GetModelsV1(ctx context.Context) ([]byte, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/v1/models", nil) if err != nil { return nil, fmt.Errorf("create request: %w", err) } resp, err := c.client.Do(httpReq) if err != nil { return nil, fmt.Errorf("ollama request: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("read response: %w", err) } return body, nil } // GetTags — GET /api/tags, возвращает список моделей из Ollama. func (c *OllamaClient) GetTags(ctx context.Context) (*model.TagsResponse, error) { httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/tags", nil) if err != nil { return nil, fmt.Errorf("create request: %w", err) } resp, err := c.client.Do(httpReq) if err != nil { return nil, fmt.Errorf("ollama request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(respBody)) } var tags model.TagsResponse if err := json.NewDecoder(resp.Body).Decode(&tags); err != nil { return nil, fmt.Errorf("decode tags: %w", err) } return &tags, nil } // Complete — синхронный вызов /api/chat (stream=false). // Используется Router LLM для классификации запроса. func (c *OllamaClient) Complete(ctx context.Context, modelName string, prompt string) (string, error) { return c.CompleteWithSystem(ctx, modelName, "", prompt) } // CompleteWithSystem — синхронный вызов с отдельным system-промптом. func (c *OllamaClient) CompleteWithSystem(ctx context.Context, modelName, system, userMsg string) (string, error) { streamFalse := false messages := []model.Message{} if system != "" { messages = append(messages, model.Message{Role: "system", Content: system}) } messages = append(messages, model.Message{Role: "user", Content: userMsg}) req := model.ChatRequest{ Model: modelName, Messages: messages, Stream: &streamFalse, } c.withOptions(&req) jsonBody, err := json.Marshal(req) if err != nil { return "", fmt.Errorf("marshal request: %w", err) } httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/api/chat", bytes.NewReader(jsonBody)) if err != nil { return "", fmt.Errorf("create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") resp, err := c.client.Do(httpReq) if err != nil { return "", fmt.Errorf("ollama request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) return "", fmt.Errorf("ollama returned %d: %s", resp.StatusCode, string(respBody)) } var chatResp model.ChatResponse if err := json.NewDecoder(resp.Body).Decode(&chatResp); err != nil { return "", fmt.Errorf("decode response: %w", err) } return chatResp.Message.Content, nil }