package feed import ( "context" "database/sql" "encoding/json" "errors" "fmt" "log/slog" "time" "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/constants" "git-molva.ru/Molva/molva-backend/services/api_gateway/internal/database" sq "github.com/Masterminds/squirrel" "github.com/google/uuid" "github.com/lib/pq" ) type Service struct { db *sql.DB schemaName string logger *slog.Logger dbClient database.Client //nolint:unused // TODO: переписать на этого клиента в рамках https://tracker.yandex.ru/MOLVARAZRABOTKA-363 } func NewService(dbUrl, schemaName string, logger *slog.Logger, dbClient database.Client) (*Service, error) { db, err := sql.Open("postgres", dbUrl) if err != nil { return nil, fmt.Errorf("error opening database connection: %w", err) } if err := db.Ping(); err != nil { return nil, fmt.Errorf("error pinging database: %w", err) } return &Service{ db: db, schemaName: schemaName, logger: logger, }, nil } func (s *Service) GetUserEvents(ctx context.Context, filter Filter, userType string) ([]Event, error) { companyIDs, err := s.GetCompanyIdsByUid(ctx, filter.OwnerID) if err != nil { return nil, err } psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) baseQuery := psql.Select( "id", "owner_id", "owner_type", "message", "event_type", "visibility", "company_id", "payload", "is_cancelled", "cancellation_reason", "created_at", "updated_at", ).From(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.FeedEventsTableName)) conditions := []sq.Sqlizer{ sq.Eq{"owner_id": filter.OwnerID}, sq.And{ sq.Eq{"visibility": VisibilityCompanyWide}, sq.Expr("company_id = ANY(?)", pq.Array(companyIDs)), }, sq.And{ sq.Expr("payload->>'additional_receiver' IS NOT NULL"), sq.Or{ sq.Expr("(payload->>'additional_receiver')::uuid = ?::uuid", filter.OwnerID), sq.Expr("(payload->>'additional_receiver')::text = ANY(?)", pq.Array(companyIDs)), }, }, } if userType == RoleAgent.String() { conditions = append(conditions, sq.Eq{"visibility": VisibilityPublic}) } query := baseQuery.Where(sq.Or(conditions)) if len(filter.EventTypes) > 0 { query = query.Where(sq.Expr("event_type = ANY(?)", pq.Array(filter.EventTypes))) } if !filter.ShowCancelled { query = query.Where(sq.Eq{"is_cancelled": false}) } query = query.OrderBy("created_at DESC"). Limit(filter.Limit). Offset(filter.Offset) sqlQuery, args, err := query.ToSql() if err != nil { return nil, fmt.Errorf("failed to build query: %w", err) } s.logger.Debug("Executing GetUserEvents query", "query", sqlQuery, "args", args) rows, err := s.db.QueryContext(ctx, sqlQuery, args...) if err != nil { return nil, fmt.Errorf("failed to query events: %w", err) } defer rows.Close() var events []Event for rows.Next() { event, err := s.scanEventRow(rows) if err != nil { return nil, err } events = append(events, event) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("error iterating event rows: %w", err) } s.logger.Debug("GetUserEvents completed successfully", slog.Int("count", len(events)), slog.String("owner_id", filter.OwnerID), slog.String("user_type", userType), slog.String("company_ids", fmt.Sprintf("%v", companyIDs)), slog.Any("filter", filter), ) return events, nil } func (s *Service) GetCompanyIdsByUid(ctx context.Context, ownerID string) ([]string, error) { psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) query := psql.Select("company_id"). From(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.ClientTableName)). Where(sq.Eq{"uid": ownerID}) sqlQuery, args, err := query.ToSql() if err != nil { return nil, fmt.Errorf("failed to build query: %w", err) } var companyID sql.NullString if err = s.db.QueryRowContext(ctx, sqlQuery, args...).Scan(&companyID); err != nil { if errors.Is(err, sql.ErrNoRows) { return []string{}, nil } return nil, fmt.Errorf("failed to get company Id: %w", err) } if companyID.Valid { return []string{companyID.String}, nil } return []string{}, nil } func (s *Service) scanEventRow(rows *sql.Rows) (Event, error) { var ( event Event payload []byte companyIDScan sql.NullString cancellationReasonScan sql.NullString ) if err := rows.Scan( &event.Id, &event.OwnerId, &event.OwnerType, &event.Message, &event.EventType, &event.Visibility, &companyIDScan, &payload, &event.IsCancelled, &cancellationReasonScan, &event.CreatedAt, &event.UpdatedAt, ); err != nil { if errors.Is(err, sql.ErrNoRows) { return Event{}, nil } return Event{}, fmt.Errorf("failed to scan event: %w", err) } event.processNullableFields(companyIDScan, cancellationReasonScan) event.processPayload(payload) return event, nil } func (e *Event) processPayload(payload []byte) { if len(payload) == 0 { e.Payload = EventPayload{} return } if err := json.Unmarshal(payload, &e.Payload); err != nil { e.Payload = EventPayload{} } } func (s *Service) AddUserEvent(ctx context.Context, event *Event) error { if event == nil { return ErrCreationInvalidData } if event.Id == "" { event.Id = "MSG_" + uuid.New().String() } now := time.Now() event.CreatedAt = now event.UpdatedAt = now payloadBytes, err := json.Marshal(event.Payload) if err != nil { return fmt.Errorf("failed to marshal payload: %w", err) } var payloadArg interface{} = payloadBytes if string(payloadBytes) == "null" || string(payloadBytes) == "{}" { payloadArg = nil } psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) query := psql.Insert(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.FeedEventsTableName)). Columns( "id", "owner_id", "owner_type", "message", "event_type", "visibility", "company_id", "payload", "is_cancelled", "cancellation_reason", "created_at", "updated_at", ). Values( event.Id, event.OwnerId, event.OwnerType, event.Message, event.EventType, event.Visibility, event.CompanyID, payloadArg, event.IsCancelled, event.CancellationReason, event.CreatedAt, event.UpdatedAt, ) sqlQuery, args, err := query.ToSql() if err != nil { return fmt.Errorf("failed to build query: %w", err) } _, err = s.db.ExecContext(ctx, sqlQuery, args...) if err != nil { return fmt.Errorf("failed to insert event: %w", err) } s.logger.Debug("Event added successfully", "event_id", event.Id) return nil } func (s *Service) CancelEvents(ctx context.Context, attachmentID, cancellationReason string) error { psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) query := psql.Update(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.FeedEventsTableName)). Set("is_cancelled", true). Set("cancellation_reason", cancellationReason). Set("updated_at", sq.Expr("NOW()")). Where(sq.Expr("payload ->> 'attachment_id' = ?", attachmentID)). Where(sq.Eq{"is_cancelled": false}) sqlQuery, args, err := query.ToSql() if err != nil { return fmt.Errorf("failed to build query: %w", err) } result, err := s.db.ExecContext(ctx, sqlQuery, args...) if err != nil { return fmt.Errorf("failed to execute cancel events query for attachmentID %s: %w", attachmentID, err) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get affected rows for attachmentID %s: %w", attachmentID, err) } if rowsAffected == 0 { s.logger.Debug("No active events found or updated for cancellation", "attachment_id", attachmentID) } else { s.logger.Debug("Events cancelled successfully", "attachment_id", attachmentID, "count", rowsAffected) } return nil } func (s *Service) GetAgentIdBySubmissionId(ctx context.Context, submissionId string) (string, error) { psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) query := psql.Select("uid"). From(fmt.Sprintf("%s.%s", pq.QuoteIdentifier(s.schemaName), constants.SubmissionTableName)). Where(sq.Eq{"id": submissionId}) sqlQuery, args, err := query.ToSql() if err != nil { return "", fmt.Errorf("failed to build query: %w", err) } var agentId string if err = s.db.QueryRowContext(ctx, sqlQuery, args...).Scan(&agentId); err != nil { if errors.Is(err, sql.ErrNoRows) { return "", ErrNotFound } return "", fmt.Errorf("failed to get agent Id: %w", err) } if agentId == "" { s.logger.Debug("Agent Id not found", "submission_id", submissionId) return "", ErrNotFound } return agentId, nil }