Files
Alex Shevchuk d84487d238 1
2025-08-18 17:12:04 +03:00

258 lines
5.2 KiB
Go

package broker
import (
"context"
"errors"
"fmt"
"log/slog"
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/constants"
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/config"
requestmodel "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/request_model"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
)
var conn *amqp.Connection
var (
NotificationQueue string
)
type Broker interface {
Connect() error
Disconnect() error
}
type RabbitMQBroker struct {
cfg *config.Broker
}
func NewRabbitMQBroker(cfg *config.Broker) *RabbitMQBroker {
NotificationQueue = cfg.NotificationsQueueName
return &RabbitMQBroker{
cfg: cfg,
}
}
func (r *RabbitMQBroker) Connect() error {
var err error
rabbitConn := fmt.Sprintf(
"amqp://%s:%s@%s:%d",
r.cfg.UserName,
r.cfg.Password,
r.cfg.Host,
r.cfg.Port,
)
conn, err = amqp.Dial(rabbitConn)
return err
}
func (r *RabbitMQBroker) Disconnect() error {
return conn.Close()
}
func produceMessage(qName, replyToQName, corrId string, message []byte, headers amqp.Table) (err error) {
ch, err := conn.Channel()
if err != nil {
return err
}
defer func(ch *amqp.Channel) {
if errClose := ch.Close(); errClose != nil {
err = errors.Join(err, errClose)
}
}(ch)
q, err := ch.QueueDeclare(
qName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout)
defer cancel()
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
DeliveryMode: amqp.Persistent,
Type: headers["command"].(string),
ReplyTo: replyToQName,
CorrelationId: corrId,
Body: message,
Headers: headers,
},
)
if err != nil {
return err
}
return nil
}
func consumeMessage(qName string, corrId string) (resp []byte, err error) {
ch, err := conn.Channel()
if err != nil {
return nil, err
}
defer func(ch *amqp.Channel) {
if errClose := ch.Close(); errClose != nil {
err = errors.Join(err, errClose)
}
}(ch)
if _, err = ch.QueueDeclare(
qName, // name
true, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
); err != nil {
return nil, err
}
messages, err := ch.Consume(
qName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}
response := <-messages
//todo: Поправить на проверку correlationId
// if d.CorrelationId == corrId {
// return nil, fmt.Errorf("invalid correlation ID %s", d.CorrelationId)
// }
if err = response.Ack(false); err != nil {
return nil, err
}
return response.Body, nil
}
func ProcessRequest(qName, replyToQName, corrId string, headers amqp.Table, message []byte, log *slog.Logger) ([]byte, error) {
log.Info("produce request to: ",
slog.String("qName", qName),
slog.String("replyToQName", replyToQName),
slog.String("command", headers["command"].(string)),
slog.String("corrId", corrId),
)
if err := produceMessage(qName, replyToQName, corrId, message, headers); err != nil {
log.Error("got some error while producing message: ",
slog.String("error", err.Error()),
slog.String("corrId", corrId))
return nil, err
}
log.Info("consume response from: ",
slog.String("qName", qName),
slog.String("replyToQName", replyToQName),
slog.String("command", headers["command"].(string)),
slog.String("corrId", corrId),
)
body, err := consumeMessage(replyToQName, corrId)
if err != nil {
log.Error("got some error while consuming message: ",
slog.String("error", err.Error()),
slog.String("corrId", corrId))
return body, err
}
return body, nil
}
func BuildAndProcessRequest(qName string, headers requestmodel.Header, message []byte, log *slog.Logger) ([]byte, error) {
queueUuid := uuid.New().String()
resp, err := ProcessRequest(
qName,
queueUuid,
"0",
headers.ToTable(),
message,
log,
)
return resp, err
}
func SendNotification(qName, messageType string, message []byte, log *slog.Logger) error {
ch, err := conn.Channel()
if err != nil {
return err
}
defer func(ch *amqp.Channel) {
if errClose := ch.Close(); errClose != nil {
err = errors.Join(err, errClose)
}
}(ch)
q, err := ch.QueueDeclare(
qName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Error("error while sending notification message",
slog.String("error", err.Error()),
slog.String("qName", qName))
return err
}
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout)
defer cancel()
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Type: messageType,
Body: message,
},
)
if err != nil {
log.Error("error while sending notification message",
slog.String("error", err.Error()),
slog.String("qName", qName))
return err
}
return nil
}