326 lines
8.4 KiB
Go
326 lines
8.4 KiB
Go
package feed
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/constants"
|
|
"git-molva.ru/Molva/molva-backend/services/api_gateway/internal/database"
|
|
|
|
sq "github.com/Masterminds/squirrel"
|
|
"github.com/google/uuid"
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
type Service struct {
|
|
db *sql.DB
|
|
schemaName string
|
|
logger *slog.Logger
|
|
dbClient database.Client //nolint:unused // TODO: переписать на этого клиента в рамках https://tracker.yandex.ru/MOLVARAZRABOTKA-363
|
|
}
|
|
|
|
func NewService(dbUrl, schemaName string, logger *slog.Logger, dbClient database.Client) (*Service, error) {
|
|
db, err := sql.Open("postgres", dbUrl)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error opening database connection: %w", err)
|
|
}
|
|
|
|
if err := db.Ping(); err != nil {
|
|
return nil, fmt.Errorf("error pinging database: %w", err)
|
|
}
|
|
|
|
return &Service{
|
|
db: db,
|
|
schemaName: schemaName,
|
|
logger: logger,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) GetUserEvents(ctx context.Context, filter Filter, userType string) ([]Event, error) {
|
|
companyIDs, err := s.GetCompanyIdsByUid(ctx, filter.OwnerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
|
|
baseQuery := psql.Select(
|
|
"id", "owner_id", "owner_type", "message", "event_type", "visibility", "company_id",
|
|
"payload", "is_cancelled", "cancellation_reason", "created_at", "updated_at",
|
|
).From(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.FeedEventsTableName))
|
|
|
|
conditions := []sq.Sqlizer{
|
|
sq.Eq{"owner_id": filter.OwnerID},
|
|
sq.And{
|
|
sq.Eq{"visibility": VisibilityCompanyWide},
|
|
sq.Expr("company_id = ANY(?)", pq.Array(companyIDs)),
|
|
},
|
|
sq.And{
|
|
sq.Expr("payload->>'additional_receiver' IS NOT NULL"),
|
|
sq.Or{
|
|
sq.Expr("(payload->>'additional_receiver')::uuid = ?::uuid", filter.OwnerID),
|
|
sq.Expr("(payload->>'additional_receiver')::text = ANY(?)", pq.Array(companyIDs)),
|
|
},
|
|
},
|
|
}
|
|
|
|
if userType == RoleAgent.String() {
|
|
conditions = append(conditions, sq.Eq{"visibility": VisibilityPublic})
|
|
}
|
|
|
|
query := baseQuery.Where(sq.Or(conditions))
|
|
|
|
if len(filter.EventTypes) > 0 {
|
|
query = query.Where(sq.Expr("event_type = ANY(?)", pq.Array(filter.EventTypes)))
|
|
}
|
|
|
|
if !filter.ShowCancelled {
|
|
query = query.Where(sq.Eq{"is_cancelled": false})
|
|
}
|
|
|
|
query = query.OrderBy("created_at DESC").
|
|
Limit(filter.Limit).
|
|
Offset(filter.Offset)
|
|
|
|
sqlQuery, args, err := query.ToSql()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
s.logger.Debug("Executing GetUserEvents query", "query", sqlQuery, "args", args)
|
|
|
|
rows, err := s.db.QueryContext(ctx, sqlQuery, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query events: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var events []Event
|
|
|
|
for rows.Next() {
|
|
event, err := s.scanEventRow(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
events = append(events, event)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating event rows: %w", err)
|
|
}
|
|
|
|
s.logger.Debug("GetUserEvents completed successfully",
|
|
slog.Int("count", len(events)),
|
|
slog.String("owner_id", filter.OwnerID),
|
|
slog.String("user_type", userType),
|
|
slog.String("company_ids", fmt.Sprintf("%v", companyIDs)),
|
|
slog.Any("filter", filter),
|
|
)
|
|
|
|
return events, nil
|
|
}
|
|
|
|
func (s *Service) GetCompanyIdsByUid(ctx context.Context, ownerID string) ([]string, error) {
|
|
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
query := psql.Select("company_id").
|
|
From(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.ClientTableName)).
|
|
Where(sq.Eq{"uid": ownerID})
|
|
|
|
sqlQuery, args, err := query.ToSql()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
var companyID sql.NullString
|
|
|
|
if err = s.db.QueryRowContext(ctx, sqlQuery, args...).Scan(&companyID); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return []string{}, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("failed to get company Id: %w", err)
|
|
}
|
|
|
|
if companyID.Valid {
|
|
return []string{companyID.String}, nil
|
|
}
|
|
|
|
return []string{}, nil
|
|
}
|
|
|
|
func (s *Service) scanEventRow(rows *sql.Rows) (Event, error) {
|
|
var (
|
|
event Event
|
|
payload []byte
|
|
companyIDScan sql.NullString
|
|
cancellationReasonScan sql.NullString
|
|
)
|
|
|
|
if err := rows.Scan(
|
|
&event.Id,
|
|
&event.OwnerId,
|
|
&event.OwnerType,
|
|
&event.Message,
|
|
&event.EventType,
|
|
&event.Visibility,
|
|
&companyIDScan,
|
|
&payload,
|
|
&event.IsCancelled,
|
|
&cancellationReasonScan,
|
|
&event.CreatedAt,
|
|
&event.UpdatedAt,
|
|
); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return Event{}, nil
|
|
}
|
|
|
|
return Event{}, fmt.Errorf("failed to scan event: %w", err)
|
|
}
|
|
|
|
event.processNullableFields(companyIDScan, cancellationReasonScan)
|
|
event.processPayload(payload)
|
|
|
|
return event, nil
|
|
}
|
|
|
|
func (e *Event) processPayload(payload []byte) {
|
|
if len(payload) == 0 {
|
|
e.Payload = EventPayload{}
|
|
|
|
return
|
|
}
|
|
|
|
if err := json.Unmarshal(payload, &e.Payload); err != nil {
|
|
e.Payload = EventPayload{}
|
|
}
|
|
}
|
|
|
|
func (s *Service) AddUserEvent(ctx context.Context, event *Event) error {
|
|
if event == nil {
|
|
return ErrCreationInvalidData
|
|
}
|
|
|
|
if event.Id == "" {
|
|
event.Id = "MSG_" + uuid.New().String()
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
event.CreatedAt = now
|
|
event.UpdatedAt = now
|
|
|
|
payloadBytes, err := json.Marshal(event.Payload)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal payload: %w", err)
|
|
}
|
|
|
|
var payloadArg interface{} = payloadBytes
|
|
if string(payloadBytes) == "null" || string(payloadBytes) == "{}" {
|
|
payloadArg = nil
|
|
}
|
|
|
|
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
query := psql.Insert(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.FeedEventsTableName)).
|
|
Columns(
|
|
"id", "owner_id", "owner_type", "message", "event_type", "visibility", "company_id",
|
|
"payload", "is_cancelled", "cancellation_reason", "created_at", "updated_at",
|
|
).
|
|
Values(
|
|
event.Id,
|
|
event.OwnerId,
|
|
event.OwnerType,
|
|
event.Message,
|
|
event.EventType,
|
|
event.Visibility,
|
|
event.CompanyID,
|
|
payloadArg,
|
|
event.IsCancelled,
|
|
event.CancellationReason,
|
|
event.CreatedAt,
|
|
event.UpdatedAt,
|
|
)
|
|
|
|
sqlQuery, args, err := query.ToSql()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
_, err = s.db.ExecContext(ctx, sqlQuery, args...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert event: %w", err)
|
|
}
|
|
|
|
s.logger.Debug("Event added successfully", "event_id", event.Id)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) CancelEvents(ctx context.Context, attachmentID, cancellationReason string) error {
|
|
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
query := psql.Update(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.FeedEventsTableName)).
|
|
Set("is_cancelled", true).
|
|
Set("cancellation_reason", cancellationReason).
|
|
Set("updated_at", sq.Expr("NOW()")).
|
|
Where(sq.Expr("payload ->> 'attachment_id' = ?", attachmentID)).
|
|
Where(sq.Eq{"is_cancelled": false})
|
|
|
|
sqlQuery, args, err := query.ToSql()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
result, err := s.db.ExecContext(ctx, sqlQuery, args...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to execute cancel events query for attachmentID %s: %w", attachmentID, err)
|
|
}
|
|
|
|
rowsAffected, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get affected rows for attachmentID %s: %w", attachmentID, err)
|
|
}
|
|
|
|
if rowsAffected == 0 {
|
|
s.logger.Debug("No active events found or updated for cancellation", "attachment_id", attachmentID)
|
|
} else {
|
|
s.logger.Debug("Events cancelled successfully", "attachment_id", attachmentID, "count", rowsAffected)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) GetAgentIdBySubmissionId(ctx context.Context, submissionId string) (string, error) {
|
|
psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
query := psql.Select("uid").
|
|
From(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.SubmissionTableName)).
|
|
Where(sq.Eq{"id": submissionId})
|
|
|
|
sqlQuery, args, err := query.ToSql()
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
var agentId string
|
|
|
|
if err = s.db.QueryRowContext(ctx, sqlQuery, args...).Scan(&agentId); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return "", ErrNotFound
|
|
}
|
|
|
|
return "", fmt.Errorf("failed to get agent Id: %w", err)
|
|
}
|
|
|
|
if agentId == "" {
|
|
s.logger.Debug("Agent Id not found", "submission_id", submissionId)
|
|
|
|
return "", ErrNotFound
|
|
}
|
|
|
|
return agentId, nil
|
|
}
|