new proxy

This commit is contained in:
2026-03-18 21:18:17 +03:00
parent 971332c283
commit b51a8850bc
17 changed files with 126 additions and 516 deletions

View File

@@ -13,48 +13,40 @@ import (
// LoadConfig загружает конфиг из .env (если есть) и окружения.
func LoadConfig(logger *slog.Logger) (*Config, error) {
_ = godotenv.Load() // необязательно фейлиться, если файла нет
_ = godotenv.Load()
cfg := &Config{}
cfg.Timezone = GetEnvAs("TIMEZONE", "UTC", ParseString)
cfg.ServiceURL = GetEnvAs("SERVICE_URL", "http://localhost:8080", ParseString)
cfg.ListenAddr = GetEnvAs("LISTEN_ADDR", ":8080", ParseString)
cfg.BackendURL = GetEnvAs("OLLAMA_BACKEND", "http://localhost:11434", ParseString)
cfg.LoggingConfig.Instance = logger
cfg.LoggingConfig.Level = GetEnvAs("LOG_LEVEL", "info", ParseString)
cfg.LoggingConfig.ShowCfgDump = GetEnvAs("LOG_SHOW_DUMP", false, ParseBool)
cfg.DatabaseConfig = FillDatabaseConfig()
if cfg.LoggingConfig.ShowCfgDump {
cfg.Print()
}
return cfg, nil
}
// PrintConfig выводит конфигурацию (или любой другой struct) в виде таблички "KEY - VALUE".
// Функция использует рефлексию для перебора полей структуры.
// Print выводит конфигурацию в виде таблички "KEY - VALUE".
func (c *Config) Print() {
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
fmt.Fprintln(w, "Loaded configuration:")
fmt.Fprintln(w, "KEY\tVALUE")
fmt.Fprintln(w, "----\t-----")
// Получаем reflect.Value объекта.
v := reflect.ValueOf(c)
// Если передан указатель, получаем значение, на которое он указывает.
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
t := v.Type()
// Перебираем все поля структуры.
for i := 0; i < v.NumField(); i++ {
fieldName := t.Field(i).Name
fieldValue := v.Field(i).Interface()
// Если поле имеет тип time.Duration, выводим его в виде строки.
if d, ok := fieldValue.(time.Duration); ok {
fieldValue = d.String()
}

View File

@@ -1,113 +0,0 @@
package config
import (
"fmt"
"net/url"
"time"
)
type DatabaseConfig struct {
URL string
Kind string
Host string
Port string
User string
Password string
Name string
UseTLS bool
Timeout time.Duration
}
func FillDatabaseConfig() DatabaseConfig {
databaseURL := GetEnvAs("DATABASE_URL", "", ParseString)
if databaseURL == "" {
return DatabaseConfig{}
}
u, err := url.Parse(databaseURL)
if err != nil {
return DatabaseConfig{}
}
kind := "postgres"
if u.Scheme == "postgresql" {
kind = "postgres"
} else if u.Scheme != "" {
kind = u.Scheme
}
dbName := ""
if len(u.Path) > 1 {
dbName = u.Path[1:]
}
return DatabaseConfig{
URL: databaseURL,
Kind: kind,
Host: u.Hostname(),
Port: u.Port(),
User: u.User.Username(),
Password: func() string {
password, _ := u.User.Password()
return password
}(),
Name: dbName,
UseTLS: GetEnvAs("DATABASE_USETLS", false, ParseBool),
Timeout: GetEnvAs("DATABASE_TIMEOUT", 30*time.Second, ParseDuration),
}
}
// GetDatabaseURLForLogging возвращает URL базы данных для логирования, скрывая пароль.
func GetDatabaseURLForLogging(cfg *DatabaseConfig) (string, error) {
if cfg.URL == "" {
return "", fmt.Errorf("parameter DATABASE_URL is empty")
}
u, err := url.Parse(cfg.URL)
if err == nil {
if u.User != nil {
username := u.User.Username()
s := u.Scheme + "://" + username + ":***@" + u.Host
if u.Path != "" {
s += u.Path
}
if u.RawQuery != "" {
s += "?" + u.RawQuery
}
return s, err
}
}
return u.String(), err
}
func GetDatabaseDSN(cfg *DatabaseConfig) (string, error) {
if cfg.URL == "" {
return "", fmt.Errorf("parameter DATABASE_URL is empty")
}
u, err := url.Parse(cfg.URL)
if err == nil {
query := u.Query()
if !cfg.UseTLS {
query.Set("sslmode", "disable")
} else if query.Get("sslmode") == "" {
query.Set("sslmode", "require")
}
u.RawQuery = query.Encode()
return u.String(), err
}
dsn := fmt.Sprintf(
"host=%s port=%s user=%s password=%s dbname=%s",
cfg.Host, cfg.Port, cfg.User, cfg.Password, cfg.Name,
)
if !cfg.UseTLS {
dsn += " sslmode=disable"
} else {
dsn += " sslmode=require"
}
return dsn, nil
}

View File

@@ -9,9 +9,8 @@ type LoggingConfig struct {
}
type Config struct {
Timezone string
ServiceURL string
ListenAddr string // env: LISTEN_ADDR, по умолчанию ":8080"
BackendURL string // env: OLLAMA_BACKEND, по умолчанию "http://localhost:11434"
LoggingConfig
DatabaseConfig
}

View File

@@ -1,134 +0,0 @@
package gateway
import (
"backend/src/internal/config"
"backend/src/logic"
"backend/src/models"
"context"
"encoding/json"
"log"
"net/http"
"sync"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)
// глобальный хаб соединений WebSocket
var hub sync.Map
// Настройки для обновления соединения до WebSocket
var upgrader = websocket.Upgrader{
// разрешаем соединения с любых источников
// (для продакшена стоит ограничить)
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// WebSocketHubResponse отправляет ответ отдельному приложению
func WebSocketHubResponse(packet models.ClientPacket) {
cid := packet.ClientID
msg := packet.Payload
value, ok := hub.Load(cid)
if ok {
conn := value.(*websocket.Conn)
log.Printf("[WS] Sending message: %s", msg)
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Printf("[WS] Send error: %v", err)
}
} else {
log.Printf("[WS] Client <%s> unknown, skipping sending...", cid)
}
}
// Worker обрабатывает канал подписки и отправляет результат в WebSocket
func Worker(ch models.ChannelOut) {
log.Printf("[WS] Response worker is ready")
for packet := range ch {
WebSocketHubResponse(packet)
}
}
// Обработчик для WebSocket соединения
func WebSocketHandler(ctx context.Context, cfg *config.Config,
w http.ResponseWriter, r *http.Request, bl *logic.Business) {
logger := cfg.LoggingConfig.Instance
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Error("[WS] Unable to restart connection", "error", err)
return
}
defer conn.Close()
// рассчитываем, что клиент присылает свой идентификатор, иначе создаём
cid := r.URL.Query().Get("cid")
if cid == "" {
cid = uuid.New().String() // FIXME: или сбрасывать соединение
}
// сохраняем соединение с клиентом в хабе
hub.Store(cid, conn)
logger.Info("[WS] Remote <%s> connected: %s", cid, conn.RemoteAddr())
// создаём канал для ответов в вебсокет
wsResponseChannel := make(chan models.ClientPacket, 10000)
// Запускаем горутину, которая будет получать сообщения из канала и отправлять их клиенту.
go Worker(wsResponseChannel)
// начинаем обработку сообщений от веб приложения
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
logger.Info("[WS] Remote <%s> disconnected: %v:", cid, err)
hub.Delete(cid)
break
}
// получаем общие поля
var commonFields models.BaseFieldT
if err := json.Unmarshal(message, &commonFields); err != nil {
log.Println("[WS] Unable unmarshal common fields:", err)
continue
}
// определяем тип сообщения
switch commonFields.Action {
case "COMMAND1":
logger.Info("[WS] COMMAND1 requested!")
/*
var request models.CoordinatorConfigurationRequestT
if err := json.Unmarshal(message, &request); err != nil {
logger.Error("[WS] Unable unmarshal CoordinatorConfigurationRequestT")
continue
}
payload, err := bl.GetCoordinatorConfiguration(request.OrganizationID)
var packet models.CoordinatorConfigurationResponseT
if err != nil {
packet = models.CoordinatorConfigurationResponseT{
BaseFieldT: commonFields,
Status: 200,
Payload: *payload,
}
} else {
packet = models.CoordinatorConfigurationResponseT{
BaseFieldT: commonFields,
Status: 400,
Payload: models.CoordinatorConfigurationT{},
}
}
data, _ := json.Marshal(packet)
wsResponseChannel <- models.ClientPacket{ClientID: cid, Payload: data}
*/
default:
if messageType == websocket.TextMessage {
log.Printf("[WS] Got text message: %s", string(message))
} else {
log.Printf("[WS] Got unknown message: %d", messageType)
}
}
}
}

View File

@@ -1,65 +0,0 @@
package gateway
import (
"context"
"log"
"github.com/gorilla/websocket"
)
// ClientPacket используется для передачи пакетов через Go каналы
type ClientPacket struct {
ClientID string
Payload []byte
}
type ChannelIn chan<- ClientPacket
type ChannelOut <-chan ClientPacket
type WebSocketQueue struct {
ctx context.Context
ch chan ClientPacket
}
func NewWebSocketQueue(ctx context.Context) *WebSocketQueue {
q := WebSocketQueue{
ctx: ctx,
ch: make(chan ClientPacket, 10000),
}
// Запускаем горутину, которая будет получать сообщения из канала
// и отправлять их клиенту.
go q.runWorker()
return &q
}
// runWorker обрабатывает канал подписк и отправляет результат в WebSocket
func (q *WebSocketQueue) runWorker() {
// создаём канал для ответов в вебсокет
log.Printf("[WS] Response worker is ready")
for packet := range q.ch {
q.realSend(packet)
}
}
// Send на самом деле помещает пакет в очередь отправки, которая затем
// разгребается с помощью WorkerOfQueuedResponsesToWS.
func (q *WebSocketQueue) Send(packet ClientPacket) {
q.ch <- packet
}
// WebSocketHubResponse отправляет ответ отдельному приложению
func (q *WebSocketQueue) realSend(packet ClientPacket) {
cid := packet.ClientID
msg := packet.Payload
value, ok := hub.Load(cid)
if ok {
conn := value.(*websocket.Conn)
log.Printf("[WS] Sending message: %s", msg)
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Printf("[WS] Send error: %v", err)
}
} else {
log.Printf("[WS] Client <%s> unknown, skipping sending...", cid)
}
}

View File

@@ -6,12 +6,11 @@ import (
"strings"
)
// New creates a slog logger with the provided level string (e.g., "debug", "info").
func New(level string) *slog.Logger {
return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: ParseLevel(level)}))
}
// ParseLevel converts a string level to slog.Level, defaults to info on unknown.
func ParseLevel(lvl string) slog.Level {
switch strings.ToLower(lvl) {
case "debug":