1
This commit is contained in:
257
internal/broker/rmq.go
Normal file
257
internal/broker/rmq.go
Normal file
@@ -0,0 +1,257 @@
|
||||
package broker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/constants"
|
||||
|
||||
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/config"
|
||||
requestmodel "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/request_model"
|
||||
"github.com/google/uuid"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
var conn *amqp.Connection
|
||||
|
||||
var (
|
||||
NotificationQueue string
|
||||
)
|
||||
|
||||
type Broker interface {
|
||||
Connect() error
|
||||
Disconnect() error
|
||||
}
|
||||
|
||||
type RabbitMQBroker struct {
|
||||
cfg *config.Broker
|
||||
}
|
||||
|
||||
func NewRabbitMQBroker(cfg *config.Broker) *RabbitMQBroker {
|
||||
NotificationQueue = cfg.NotificationsQueueName
|
||||
|
||||
return &RabbitMQBroker{
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RabbitMQBroker) Connect() error {
|
||||
var err error
|
||||
|
||||
rabbitConn := fmt.Sprintf(
|
||||
"amqp://%s:%s@%s:%d",
|
||||
r.cfg.UserName,
|
||||
r.cfg.Password,
|
||||
r.cfg.Host,
|
||||
r.cfg.Port,
|
||||
)
|
||||
|
||||
conn, err = amqp.Dial(rabbitConn)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *RabbitMQBroker) Disconnect() error {
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
func produceMessage(qName, replyToQName, corrId string, message []byte, headers amqp.Table) (err error) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func(ch *amqp.Channel) {
|
||||
if errClose := ch.Close(); errClose != nil {
|
||||
err = errors.Join(err, errClose)
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
qName, // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout)
|
||||
defer cancel()
|
||||
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
Type: headers["command"].(string),
|
||||
ReplyTo: replyToQName,
|
||||
CorrelationId: corrId,
|
||||
Body: message,
|
||||
Headers: headers,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func consumeMessage(qName string, corrId string) (resp []byte, err error) {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func(ch *amqp.Channel) {
|
||||
if errClose := ch.Close(); errClose != nil {
|
||||
err = errors.Join(err, errClose)
|
||||
}
|
||||
}(ch)
|
||||
|
||||
if _, err = ch.QueueDeclare(
|
||||
qName, // name
|
||||
true, // durable
|
||||
true, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
messages, err := ch.Consume(
|
||||
qName, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := <-messages
|
||||
//todo: Поправить на проверку correlationId
|
||||
// if d.CorrelationId == corrId {
|
||||
// return nil, fmt.Errorf("invalid correlation ID %s", d.CorrelationId)
|
||||
// }
|
||||
|
||||
if err = response.Ack(false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return response.Body, nil
|
||||
}
|
||||
|
||||
func ProcessRequest(qName, replyToQName, corrId string, headers amqp.Table, message []byte, log *slog.Logger) ([]byte, error) {
|
||||
log.Info("produce request to: ",
|
||||
slog.String("qName", qName),
|
||||
slog.String("replyToQName", replyToQName),
|
||||
slog.String("command", headers["command"].(string)),
|
||||
slog.String("corrId", corrId),
|
||||
)
|
||||
|
||||
if err := produceMessage(qName, replyToQName, corrId, message, headers); err != nil {
|
||||
log.Error("got some error while producing message: ",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("corrId", corrId))
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Info("consume response from: ",
|
||||
slog.String("qName", qName),
|
||||
slog.String("replyToQName", replyToQName),
|
||||
slog.String("command", headers["command"].(string)),
|
||||
slog.String("corrId", corrId),
|
||||
)
|
||||
|
||||
body, err := consumeMessage(replyToQName, corrId)
|
||||
if err != nil {
|
||||
log.Error("got some error while consuming message: ",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("corrId", corrId))
|
||||
|
||||
return body, err
|
||||
}
|
||||
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func BuildAndProcessRequest(qName string, headers requestmodel.Header, message []byte, log *slog.Logger) ([]byte, error) {
|
||||
queueUuid := uuid.New().String()
|
||||
|
||||
resp, err := ProcessRequest(
|
||||
qName,
|
||||
queueUuid,
|
||||
"0",
|
||||
headers.ToTable(),
|
||||
message,
|
||||
log,
|
||||
)
|
||||
|
||||
return resp, err
|
||||
}
|
||||
|
||||
func SendNotification(qName, messageType string, message []byte, log *slog.Logger) error {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func(ch *amqp.Channel) {
|
||||
if errClose := ch.Close(); errClose != nil {
|
||||
err = errors.Join(err, errClose)
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
qName, // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("error while sending notification message",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("qName", qName))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), constants.DefaultContextTimeout)
|
||||
defer cancel()
|
||||
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
Type: messageType,
|
||||
Body: message,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Error("error while sending notification message",
|
||||
slog.String("error", err.Error()),
|
||||
slog.String("qName", qName))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user