258 lines
5.2 KiB
Go
258 lines
5.2 KiB
Go
package broker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
|
|
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/constants"
|
|
|
|
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/config"
|
|
requestmodel "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/request_model"
|
|
"github.com/google/uuid"
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
)
|
|
|
|
var conn *amqp.Connection
|
|
|
|
var (
|
|
NotificationQueue string
|
|
)
|
|
|
|
type Broker interface {
|
|
Connect() error
|
|
Disconnect() error
|
|
}
|
|
|
|
type RabbitMQBroker struct {
|
|
cfg *config.Broker
|
|
}
|
|
|
|
func NewRabbitMQBroker(cfg *config.Broker) *RabbitMQBroker {
|
|
NotificationQueue = cfg.NotificationsQueueName
|
|
|
|
return &RabbitMQBroker{
|
|
cfg: cfg,
|
|
}
|
|
}
|
|
|
|
func (r *RabbitMQBroker) Connect() error {
|
|
var err error
|
|
|
|
rabbitConn := fmt.Sprintf(
|
|
"amqp://%s:%s@%s:%d",
|
|
r.cfg.UserName,
|
|
r.cfg.Password,
|
|
r.cfg.Host,
|
|
r.cfg.Port,
|
|
)
|
|
|
|
conn, err = amqp.Dial(rabbitConn)
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *RabbitMQBroker) Disconnect() error {
|
|
return conn.Close()
|
|
}
|
|
|
|
func produceMessage(qName, replyToQName, corrId string, message []byte, headers amqp.Table) (err error) {
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func(ch *amqp.Channel) {
|
|
if errClose := ch.Close(); errClose != nil {
|
|
err = errors.Join(err, errClose)
|
|
}
|
|
}(ch)
|
|
|
|
q, err := ch.QueueDeclare(
|
|
qName, // name
|
|
true, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout)
|
|
defer cancel()
|
|
|
|
err = ch.PublishWithContext(ctx,
|
|
"", // exchange
|
|
q.Name, // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
DeliveryMode: amqp.Persistent,
|
|
Type: headers["command"].(string),
|
|
ReplyTo: replyToQName,
|
|
CorrelationId: corrId,
|
|
Body: message,
|
|
Headers: headers,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func consumeMessage(qName string, corrId string) (resp []byte, err error) {
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer func(ch *amqp.Channel) {
|
|
if errClose := ch.Close(); errClose != nil {
|
|
err = errors.Join(err, errClose)
|
|
}
|
|
}(ch)
|
|
|
|
if _, err = ch.QueueDeclare(
|
|
qName, // name
|
|
true, // durable
|
|
true, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
messages, err := ch.Consume(
|
|
qName, // queue
|
|
"", // consumer
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
response := <-messages
|
|
//todo: Поправить на проверку correlationId
|
|
// if d.CorrelationId == corrId {
|
|
// return nil, fmt.Errorf("invalid correlation ID %s", d.CorrelationId)
|
|
// }
|
|
|
|
if err = response.Ack(false); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return response.Body, nil
|
|
}
|
|
|
|
func ProcessRequest(qName, replyToQName, corrId string, headers amqp.Table, message []byte, log *slog.Logger) ([]byte, error) {
|
|
log.Info("produce request to: ",
|
|
slog.String("qName", qName),
|
|
slog.String("replyToQName", replyToQName),
|
|
slog.String("command", headers["command"].(string)),
|
|
slog.String("corrId", corrId),
|
|
)
|
|
|
|
if err := produceMessage(qName, replyToQName, corrId, message, headers); err != nil {
|
|
log.Error("got some error while producing message: ",
|
|
slog.String("error", err.Error()),
|
|
slog.String("corrId", corrId))
|
|
|
|
return nil, err
|
|
}
|
|
|
|
log.Info("consume response from: ",
|
|
slog.String("qName", qName),
|
|
slog.String("replyToQName", replyToQName),
|
|
slog.String("command", headers["command"].(string)),
|
|
slog.String("corrId", corrId),
|
|
)
|
|
|
|
body, err := consumeMessage(replyToQName, corrId)
|
|
if err != nil {
|
|
log.Error("got some error while consuming message: ",
|
|
slog.String("error", err.Error()),
|
|
slog.String("corrId", corrId))
|
|
|
|
return body, err
|
|
}
|
|
|
|
return body, nil
|
|
}
|
|
|
|
func BuildAndProcessRequest(qName string, headers requestmodel.Header, message []byte, log *slog.Logger) ([]byte, error) {
|
|
queueUuid := uuid.New().String()
|
|
|
|
resp, err := ProcessRequest(
|
|
qName,
|
|
queueUuid,
|
|
"0",
|
|
headers.ToTable(),
|
|
message,
|
|
log,
|
|
)
|
|
|
|
return resp, err
|
|
}
|
|
|
|
func SendNotification(qName, messageType string, message []byte, log *slog.Logger) error {
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func(ch *amqp.Channel) {
|
|
if errClose := ch.Close(); errClose != nil {
|
|
err = errors.Join(err, errClose)
|
|
}
|
|
}(ch)
|
|
|
|
q, err := ch.QueueDeclare(
|
|
qName, // name
|
|
true, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
log.Error("error while sending notification message",
|
|
slog.String("error", err.Error()),
|
|
slog.String("qName", qName))
|
|
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout)
|
|
defer cancel()
|
|
|
|
err = ch.PublishWithContext(ctx,
|
|
"", // exchange
|
|
q.Name, // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
Type: messageType,
|
|
Body: message,
|
|
},
|
|
)
|
|
if err != nil {
|
|
log.Error("error while sending notification message",
|
|
slog.String("error", err.Error()),
|
|
slog.String("qName", qName))
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|