package broker import ( "context" "errors" "fmt" "log/slog" "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/constants" "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/config" requestmodel "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/request_model" "github.com/google/uuid" amqp "github.com/rabbitmq/amqp091-go" ) var conn *amqp.Connection var ( NotificationQueue string ) type Broker interface { Connect() error Disconnect() error } type RabbitMQBroker struct { cfg *config.Broker } func NewRabbitMQBroker(cfg *config.Broker) *RabbitMQBroker { NotificationQueue = cfg.NotificationsQueueName return &RabbitMQBroker{ cfg: cfg, } } func (r *RabbitMQBroker) Connect() error { var err error rabbitConn := fmt.Sprintf( "amqp://%s:%s@%s:%d", r.cfg.UserName, r.cfg.Password, r.cfg.Host, r.cfg.Port, ) conn, err = amqp.Dial(rabbitConn) return err } func (r *RabbitMQBroker) Disconnect() error { return conn.Close() } func produceMessage(qName, replyToQName, corrId string, message []byte, headers amqp.Table) (err error) { ch, err := conn.Channel() if err != nil { return err } defer func(ch *amqp.Channel) { if errClose := ch.Close(); errClose != nil { err = errors.Join(err, errClose) } }(ch) q, err := ch.QueueDeclare( qName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout) defer cancel() err = ch.PublishWithContext(ctx, "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ DeliveryMode: amqp.Persistent, Type: headers["command"].(string), ReplyTo: replyToQName, CorrelationId: corrId, Body: message, Headers: headers, }, ) if err != nil { return err } return nil } func consumeMessage(qName string, corrId string) (resp []byte, err error) { ch, err := conn.Channel() if err != nil { return nil, err } defer func(ch *amqp.Channel) { if errClose := ch.Close(); errClose != nil { err = errors.Join(err, errClose) } }(ch) if _, err = ch.QueueDeclare( qName, // name true, // durable true, // delete when unused false, // exclusive false, // no-wait nil, // arguments ); err != nil { return nil, err } messages, err := ch.Consume( qName, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return nil, err } response := <-messages //todo: Поправить на проверку correlationId // if d.CorrelationId == corrId { // return nil, fmt.Errorf("invalid correlation ID %s", d.CorrelationId) // } if err = response.Ack(false); err != nil { return nil, err } return response.Body, nil } func ProcessRequest(qName, replyToQName, corrId string, headers amqp.Table, message []byte, log *slog.Logger) ([]byte, error) { log.Info("produce request to: ", slog.String("qName", qName), slog.String("replyToQName", replyToQName), slog.String("command", headers["command"].(string)), slog.String("corrId", corrId), ) if err := produceMessage(qName, replyToQName, corrId, message, headers); err != nil { log.Error("got some error while producing message: ", slog.String("error", err.Error()), slog.String("corrId", corrId)) return nil, err } log.Info("consume response from: ", slog.String("qName", qName), slog.String("replyToQName", replyToQName), slog.String("command", headers["command"].(string)), slog.String("corrId", corrId), ) body, err := consumeMessage(replyToQName, corrId) if err != nil { log.Error("got some error while consuming message: ", slog.String("error", err.Error()), slog.String("corrId", corrId)) return body, err } return body, nil } func BuildAndProcessRequest(qName string, headers requestmodel.Header, message []byte, log *slog.Logger) ([]byte, error) { queueUuid := uuid.New().String() resp, err := ProcessRequest( qName, queueUuid, "0", headers.ToTable(), message, log, ) return resp, err } func SendNotification(qName, messageType string, message []byte, log *slog.Logger) error { ch, err := conn.Channel() if err != nil { return err } defer func(ch *amqp.Channel) { if errClose := ch.Close(); errClose != nil { err = errors.Join(err, errClose) } }(ch) q, err := ch.QueueDeclare( qName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { log.Error("error while sending notification message", slog.String("error", err.Error()), slog.String("qName", qName)) return err } ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout) defer cancel() err = ch.PublishWithContext(ctx, "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ Type: messageType, Body: message, }, ) if err != nil { log.Error("error while sending notification message", slog.String("error", err.Error()), slog.String("qName", qName)) return err } return nil }