From 4ff072e4e012c5b15c7d323949d603e39a00a622 Mon Sep 17 00:00:00 2001 From: Harshil Sharma Date: Mon, 25 May 2026 11:31:44 +0530 Subject: [PATCH] WIP --- server/build/docker-compose.common.yml | 19 ++ .../build/docker/postgres-readtracking.conf | 18 ++ server/channels/app/platform/service.go | 2 +- .../000001_create_user_post_reads.down.sql | 1 + .../000001_create_user_post_reads.up.sql | 5 + .../channels/store/retrylayer/retrylayer.go | 183 +++++++++++++----- server/channels/store/sqlstore/migrate.go | 67 +++++++ .../store/sqlstore/read_tracking_store.go | 117 +++++++++++ server/channels/store/sqlstore/store.go | 46 +++++ server/channels/store/store.go | 11 ++ .../storetest/mocks/ReadTrackingStore.go | 95 +++++++++ .../channels/store/storetest/mocks/Store.go | 80 +++++--- .../storetest/mocks/UserAccessTokenStore.go | 60 +++--- server/channels/store/storetest/store.go | 5 + .../channels/store/timerlayer/timerlayer.go | 111 +++++++++-- server/docker-compose.yaml | 10 +- server/public/model/config.go | 76 ++++++++ server/public/model/user_post_read.go | 10 + server/public/utils/sql/sql_utils.go | 29 ++- 19 files changed, 811 insertions(+), 134 deletions(-) create mode 100644 server/build/docker/postgres-readtracking.conf create mode 100644 server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.down.sql create mode 100644 server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.up.sql create mode 100644 server/channels/store/sqlstore/read_tracking_store.go create mode 100644 server/channels/store/storetest/mocks/ReadTrackingStore.go create mode 100644 server/public/model/user_post_read.go diff --git a/server/build/docker-compose.common.yml b/server/build/docker-compose.common.yml index b92631bf534..db0e66cc88f 100644 --- a/server/build/docker-compose.common.yml +++ b/server/build/docker-compose.common.yml @@ -24,6 +24,25 @@ services: interval: 5s timeout: 10s retries: 3 + postgres-readtracking: + image: "postgres:14" + logging: *default-logging + restart: always + networks: + - mm-test + environment: + POSTGRES_USER: mmuser + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-mostest} + POSTGRES_DB: mattermost_readtracking + POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256 --auth-local=scram-sha-256" + command: postgres -c 'config_file=/etc/postgresql/postgresql.conf' + volumes: + - "./docker/postgres-readtracking.conf:/etc/postgresql/postgresql.conf:Z" + healthcheck: + test: [ "CMD", "pg_isready", "-h", "localhost" ] + interval: 5s + timeout: 10s + retries: 3 minio: image: "minio/minio:RELEASE.2024-06-22T05-26-45Z" logging: *default-logging diff --git a/server/build/docker/postgres-readtracking.conf b/server/build/docker/postgres-readtracking.conf new file mode 100644 index 00000000000..f6807d8228a --- /dev/null +++ b/server/build/docker/postgres-readtracking.conf @@ -0,0 +1,18 @@ +# Postgres config for the read-tracking database. +# +# This database stores an append-only event log (user_post_reads) using +# UNLOGGED tables. Durability is intentionally relaxed in favor of write +# throughput; data is acceptable-to-lose on crash. +max_connections = 500 +listen_addresses = '*' +fsync = off +full_page_writes = off +synchronous_commit = off +default_text_search_config = 'pg_catalog.english' +commit_delay = 1000 +commit_siblings = 5 +wal_writer_delay = 1000ms +checkpoint_timeout = 30min +max_wal_size = 4GB +logging_collector = off +password_encryption = 'scram-sha-256' diff --git a/server/channels/app/platform/service.go b/server/channels/app/platform/service.go index 302ae623946..ef0f535deaf 100644 --- a/server/channels/app/platform/service.go +++ b/server/channels/app/platform/service.go @@ -281,7 +281,7 @@ func New(sc ServiceConfig, options ...Option) (*PlatformService, error) { // Cache layer opts := append(ps.storeOptions, sqlstore.WithFeatureFlags(func() *model.FeatureFlags { return ps.Config().FeatureFlags - })) + }), sqlstore.WithReadTrackingSettings(ps.Config().ReadTrackingSettings)) ps.sqlStore, err = sqlstore.New(ps.Config().SqlSettings, ps.Log(), ps.metricsIFace, opts...) if err != nil { return nil, err diff --git a/server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.down.sql b/server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.down.sql new file mode 100644 index 00000000000..8d59b13cafa --- /dev/null +++ b/server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS user_post_reads; diff --git a/server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.up.sql b/server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.up.sql new file mode 100644 index 00000000000..4f1aed34e07 --- /dev/null +++ b/server/channels/db/migrations/postgres_readtracking/000001_create_user_post_reads.up.sql @@ -0,0 +1,5 @@ +CREATE UNLOGGED TABLE IF NOT EXISTS user_post_reads ( + user_id VARCHAR(26) NOT NULL, + post_id VARCHAR(26) NOT NULL, + created_at BIGINT NOT NULL +); diff --git a/server/channels/store/retrylayer/retrylayer.go b/server/channels/store/retrylayer/retrylayer.go index 23c5c02b429..d9ba5e680e6 100644 --- a/server/channels/store/retrylayer/retrylayer.go +++ b/server/channels/store/retrylayer/retrylayer.go @@ -58,6 +58,7 @@ type RetryLayer struct { PropertyValueStore store.PropertyValueStore ReactionStore store.ReactionStore ReadReceiptStore store.ReadReceiptStore + ReadTrackingStore store.ReadTrackingStore RecapStore store.RecapStore RemoteClusterStore store.RemoteClusterStore RetentionPolicyStore store.RetentionPolicyStore @@ -233,6 +234,10 @@ func (s *RetryLayer) ReadReceipt() store.ReadReceiptStore { return s.ReadReceiptStore } +func (s *RetryLayer) ReadTracking() store.ReadTrackingStore { + return s.ReadTrackingStore +} + func (s *RetryLayer) Recap() store.RecapStore { return s.RecapStore } @@ -507,6 +512,11 @@ type RetryLayerReadReceiptStore struct { Root *RetryLayer } +type RetryLayerReadTrackingStore struct { + store.ReadTrackingStore + Root *RetryLayer +} + type RetryLayerRecapStore struct { store.RecapStore Root *RetryLayer @@ -11261,6 +11271,69 @@ func (s *RetryLayerReadReceiptStore) Update(rctx request.CTX, receipt *model.Rea } +func (s *RetryLayerReadTrackingStore) HasRead(ctx context.Context, userID string, postID string) (bool, error) { + + tries := 0 + for { + result, err := s.ReadTrackingStore.HasRead(ctx, userID, postID) + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + +func (s *RetryLayerReadTrackingStore) Mark(ctx context.Context, userID string, postID string) error { + + tries := 0 + for { + err := s.ReadTrackingStore.Mark(ctx, userID, postID) + if err == nil { + return nil + } + if !isRepeatableError(err) { + return err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + +func (s *RetryLayerReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error { + + tries := 0 + for { + err := s.ReadTrackingStore.MarkBulk(ctx, pairs) + if err == nil { + return nil + } + if !isRepeatableError(err) { + return err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerRecapStore) DeleteRecap(id string) error { tries := 0 @@ -17843,48 +17916,6 @@ func (s *RetryLayerUserAccessTokenStore) Delete(tokenID string) error { } -func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { - - tries := 0 - for { - result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs) - if err == nil { - return result, nil - } - if !isRepeatableError(err) { - return result, err - } - tries++ - if tries >= 3 { - err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return result, err - } - timepkg.Sleep(100 * timepkg.Millisecond) - } - -} - -func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { - - tries := 0 - for { - result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) - if err == nil { - return result, nil - } - if !isRepeatableError(err) { - return result, err - } - tries++ - if tries >= 3 { - err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") - return result, err - } - timepkg.Sleep(100 * timepkg.Millisecond) - } - -} - func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error { tries := 0 @@ -17906,6 +17937,27 @@ func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error { } +func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { + + tries := 0 + for { + result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs) + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { tries := 0 @@ -17990,6 +18042,27 @@ func (s *RetryLayerUserAccessTokenStore) GetByUser(userID string, page int, perP } +func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { + + tries := 0 + for { + result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) + if err == nil { + return result, nil + } + if !isRepeatableError(err) { + return result, err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return result, err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerUserAccessTokenStore) Save(token *model.UserAccessToken) (*model.UserAccessToken, error) { tries := 0 @@ -18800,6 +18873,27 @@ func (s *RetryLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook) } +func (s *RetryLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error { + + tries := 0 + for { + err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed) + if err == nil { + return nil + } + if !isRepeatableError(err) { + return err + } + tries++ + if tries >= 3 { + err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures") + return err + } + timepkg.Sleep(100 * timepkg.Millisecond) + } + +} + func (s *RetryLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) { tries := 0 @@ -18849,10 +18943,6 @@ func (s *RetryLayer) TotalSearchDbConnections() int { return s.Store.TotalSearchDbConnections() } -func (s *RetryLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - return s.Store.GetDiagnostics(ctx) -} - func (s *RetryLayer) UnlockFromMaster() { s.Store.UnlockFromMaster() } @@ -18900,6 +18990,7 @@ func New(childStore store.Store) *RetryLayer { newStore.PropertyValueStore = &RetryLayerPropertyValueStore{PropertyValueStore: childStore.PropertyValue(), Root: &newStore} newStore.ReactionStore = &RetryLayerReactionStore{ReactionStore: childStore.Reaction(), Root: &newStore} newStore.ReadReceiptStore = &RetryLayerReadReceiptStore{ReadReceiptStore: childStore.ReadReceipt(), Root: &newStore} + newStore.ReadTrackingStore = &RetryLayerReadTrackingStore{ReadTrackingStore: childStore.ReadTracking(), Root: &newStore} newStore.RecapStore = &RetryLayerRecapStore{RecapStore: childStore.Recap(), Root: &newStore} newStore.RemoteClusterStore = &RetryLayerRemoteClusterStore{RemoteClusterStore: childStore.RemoteCluster(), Root: &newStore} newStore.RetentionPolicyStore = &RetryLayerRetentionPolicyStore{RetentionPolicyStore: childStore.RetentionPolicy(), Root: &newStore} diff --git a/server/channels/store/sqlstore/migrate.go b/server/channels/store/sqlstore/migrate.go index fac2aab558e..8f4aa3154d5 100644 --- a/server/channels/store/sqlstore/migrate.go +++ b/server/channels/store/sqlstore/migrate.go @@ -153,6 +153,73 @@ func (ss *SqlStore) migrate(direction migrationDirection, dryRun, enableMorphLog } } +// readTrackingMigrationsDir is the embed-relative directory holding the +// migrations applied to the independent read-tracking database. +const readTrackingMigrationsDir = "postgres_readtracking" + +func (ss *SqlStore) initReadTrackingMorph(enableLogging bool) (*morph.Morph, error) { + assets := db.Assets() + + assetsList, err := assets.ReadDir(path.Join("migrations", readTrackingMigrationsDir)) + if err != nil { + return nil, err + } + + assetNamesForDriver := make([]string, len(assetsList)) + for i, entry := range assetsList { + assetNamesForDriver[i] = entry.Name() + } + + src, err := mbindata.WithInstance(&mbindata.AssetSource{ + Names: assetNamesForDriver, + AssetFunc: func(name string) ([]byte, error) { + return assets.ReadFile(path.Join("migrations", readTrackingMigrationsDir, name)) + }, + }) + if err != nil { + return nil, err + } + + driver, err := ps.WithInstance(ss.readTrackingX.DB().DB) + if err != nil { + return nil, err + } + + var logWriter io.Writer + if enableLogging { + logWriter = &morphWriter{} + } else { + logWriter = io.Discard + } + + // The read-tracking schema is independent — use its own advisory lock key + // so it doesn't contend with the main DB's migration lock. + opts := []morph.EngineOption{ + morph.WithLogger(log.New(logWriter, "", log.Lshortfile)), + morph.WithLock("mm-readtracking-lock-key"), + morph.SetStatementTimeoutInSeconds(*ss.settings.MigrationsStatementTimeoutSeconds), + morph.SetDryRun(false), + } + + return morph.New(context.Background(), driver, src, opts...) +} + +func (ss *SqlStore) migrateReadTracking(direction migrationDirection, enableMorphLogging bool) error { + engine, err := ss.initReadTrackingMorph(enableMorphLogging) + if err != nil { + return err + } + defer engine.Close() + + switch direction { + case migrationsDirectionDown: + _, err = engine.ApplyDown(-1) + return err + default: + return engine.ApplyAll() + } +} + func (m *Migrator) GeneratePlan(shouldRecover bool) (*models.Plan, error) { diff, err := m.engine.Diff(models.Up) if err != nil { diff --git a/server/channels/store/sqlstore/read_tracking_store.go b/server/channels/store/sqlstore/read_tracking_store.go new file mode 100644 index 00000000000..26fa7d04291 --- /dev/null +++ b/server/channels/store/sqlstore/read_tracking_store.go @@ -0,0 +1,117 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package sqlstore + +import ( + "context" + + "github.com/lib/pq" + "github.com/pkg/errors" + + "github.com/mattermost/mattermost/server/public/model" + "github.com/mattermost/mattermost/server/v8/channels/store" +) + +const readTrackingTableName = "user_post_reads" + +// SqlReadTrackingStore writes user-post read events to an independent +// Postgres pool. The backing table is UNLOGGED — no WAL, no replication, +// truncated on crash — and has no unique index, so duplicates are allowed. +// Callers dedupe on read. +type SqlReadTrackingStore struct { + *SqlStore +} + +// noopReadTrackingStore is returned when ReadTrackingSettings.Enable=false, +// so callers don't have to nil-check the store before calling Mark/MarkBulk. +type noopReadTrackingStore struct{} + +func (noopReadTrackingStore) Mark(context.Context, string, string) error { + return nil +} +func (noopReadTrackingStore) MarkBulk(context.Context, []model.UserPostRead) error { + return nil +} +func (noopReadTrackingStore) HasRead(context.Context, string, string) (bool, error) { + return false, nil +} + +func newSqlReadTrackingStore(s *SqlStore) store.ReadTrackingStore { + if s.readTrackingX == nil { + return noopReadTrackingStore{} + } + return &SqlReadTrackingStore{SqlStore: s} +} + +// Mark appends a single user-post read event. +func (s *SqlReadTrackingStore) Mark(ctx context.Context, userID, postID string) error { + _, err := s.readTrackingX.ExecContext(ctx, + `INSERT INTO `+readTrackingTableName+` (user_id, post_id, created_at) VALUES ($1, $2, $3)`, + userID, postID, model.GetMillis()) + if err != nil { + return errors.Wrap(err, "failed to mark user-post read") + } + return nil +} + +// MarkBulk appends a batch of read events using Postgres' binary COPY +// protocol (lib/pq's pq.CopyIn). Orders of magnitude faster than batched +// multi-VALUES INSERT for large batches. +func (s *SqlReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error { + if len(pairs) == 0 { + return nil + } + + tx, err := s.readTrackingX.DB().BeginTx(ctx, nil) + if err != nil { + return errors.Wrap(err, "failed to begin tx for bulk mark") + } + + stmt, err := tx.PrepareContext(ctx, pq.CopyIn(readTrackingTableName, "user_id", "post_id", "created_at")) + if err != nil { + _ = tx.Rollback() + return errors.Wrap(err, "failed to prepare COPY") + } + + now := model.GetMillis() + for _, p := range pairs { + ts := p.CreatedAt + if ts == 0 { + ts = now + } + if _, err = stmt.ExecContext(ctx, p.UserID, p.PostID, ts); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return errors.Wrap(err, "failed to append row to COPY") + } + } + + // Flush — required by pq.CopyIn semantics before Close. + if _, err = stmt.ExecContext(ctx); err != nil { + _ = stmt.Close() + _ = tx.Rollback() + return errors.Wrap(err, "failed to flush COPY") + } + if err = stmt.Close(); err != nil { + _ = tx.Rollback() + return errors.Wrap(err, "failed to close COPY statement") + } + if err = tx.Commit(); err != nil { + return errors.Wrap(err, "failed to commit bulk mark") + } + return nil +} + +// HasRead returns true if at least one read event exists for the pair. +// Duplicates are ignored — EXISTS short-circuits on the first match. +func (s *SqlReadTrackingStore) HasRead(ctx context.Context, userID, postID string) (bool, error) { + var exists bool + err := s.readTrackingX.DB().GetContext(ctx, &exists, + `SELECT EXISTS(SELECT 1 FROM `+readTrackingTableName+` WHERE user_id=$1 AND post_id=$2 LIMIT 1)`, + userID, postID) + if err != nil { + return false, errors.Wrap(err, "failed to query read state") + } + return exists, nil +} diff --git a/server/channels/store/sqlstore/store.go b/server/channels/store/sqlstore/store.go index f252c26ff93..6ca70f2308e 100644 --- a/server/channels/store/sqlstore/store.go +++ b/server/channels/store/sqlstore/store.go @@ -119,6 +119,7 @@ type SqlStoreStores struct { readReceipt store.ReadReceiptStore temporaryPost store.TemporaryPostStore channelJoinRequest store.ChannelJoinRequestStore + readTracking store.ReadTrackingStore } type SqlStore struct { @@ -134,6 +135,12 @@ type SqlStore struct { searchReplicaXs []*atomic.Pointer[sqlxDBWrapper] replicaLagHandles []*sql.DB + + // readTrackingX is an independent pool for the read-tracking DB. Nil when + // ReadTrackingSettings.Enable is false. + readTrackingX *sqlxDBWrapper + rtSettings *model.ReadTrackingSettings + stores SqlStoreStores settings *model.SqlSettings lockedToMaster bool @@ -186,6 +193,15 @@ func WithFeatureFlags(fn func() *model.FeatureFlags) Option { } } +// WithReadTrackingSettings configures the independent read-tracking pool. +// If omitted (or Enable=false), Store().ReadTracking() returns a no-op store. +func WithReadTrackingSettings(rt model.ReadTrackingSettings) Option { + return func(s *SqlStore) error { + s.rtSettings = &rt + return nil + } +} + // getFeatureFlags returns the current feature flags, or defaults if no function was configured. func (ss *SqlStore) getFeatureFlags() *model.FeatureFlags { if ss.featureFlagsFn != nil { @@ -236,6 +252,13 @@ func New(settings model.SqlSettings, logger mlog.LoggerIFace, metrics einterface if err != nil { return nil, errors.Wrap(err, "failed to apply database migrations") } + + if store.readTrackingX != nil { + err = store.migrateReadTracking(migrationsDirectionUp, !store.disableMorphLogging) + if err != nil { + return nil, errors.Wrap(err, "failed to apply read-tracking database migrations") + } + } } store.isBinaryParam, err = store.computeBinaryParam() @@ -307,6 +330,7 @@ func New(settings model.SqlSettings, logger mlog.LoggerIFace, metrics einterface store.stores.readReceipt = newSqlReadReceiptStore(store, metrics) store.stores.temporaryPost = newSqlTemporaryPostStore(store, metrics) store.stores.channelJoinRequest = newSqlChannelJoinRequestStore(store) + store.stores.readTracking = newSqlReadTrackingStore(store) store.stores.preference.(*SqlPreferenceStore).deleteUnusedFeatures() @@ -375,6 +399,20 @@ func (ss *SqlStore) initConnection() error { ss.replicaLagHandles = append(ss.replicaLagHandles, replicaLagHandle) } } + + if ss.rtSettings != nil && ss.rtSettings.Enable != nil && *ss.rtSettings.Enable { + rtHandle, err := sqlUtils.SetupReadTrackingConnection(ss.Logger(), "read-tracking", ss.rtSettings, DBPingAttempts) + if err != nil { + return errors.Wrap(err, "failed to setup read-tracking connection") + } + ss.readTrackingX = newSqlxDBWrapper(sqlx.NewDb(rtHandle, model.DatabaseDriverPostgres), + time.Duration(*ss.rtSettings.QueryTimeout)*time.Second, + *ss.settings.Trace) + if ss.metrics != nil { + ss.metrics.RegisterDBCollector(ss.readTrackingX.DB().DB, "read-tracking") + } + } + return nil } @@ -725,6 +763,10 @@ func (ss *SqlStore) Close() { for _, replica := range ss.replicaLagHandles { replica.Close() } + + if ss.readTrackingX != nil { + ss.readTrackingX.Close() + } } func (ss *SqlStore) LockToMaster() { @@ -963,6 +1005,10 @@ func (ss *SqlStore) ChannelJoinRequest() store.ChannelJoinRequestStore { return ss.stores.channelJoinRequest } +func (ss *SqlStore) ReadTracking() store.ReadTrackingStore { + return ss.stores.readTracking +} + func (ss *SqlStore) DropAllTables() { ss.masterX.Exec(`DO $func$ diff --git a/server/channels/store/store.go b/server/channels/store/store.go index 0fda03addb7..4d201b7a891 100644 --- a/server/channels/store/store.go +++ b/server/channels/store/store.go @@ -105,6 +105,7 @@ type Store interface { ReadReceipt() ReadReceiptStore TemporaryPost() TemporaryPostStore ChannelJoinRequest() ChannelJoinRequestStore + ReadTracking() ReadTrackingStore } type RetentionPolicyStore interface { @@ -1274,6 +1275,16 @@ type ReadReceiptStore interface { GetUnreadCountForPost(rctx request.CTX, post *model.Post) (int64, error) } +// ReadTrackingStore appends to an UNLOGGED user_post_reads table on a +// separate Postgres pool. Writes are intentionally fire-then-fail-fast: no +// retry layer, no cache layer. Duplicates are allowed at write time and +// deduped on read. +type ReadTrackingStore interface { + Mark(ctx context.Context, userID, postID string) error + MarkBulk(ctx context.Context, pairs []model.UserPostRead) error + HasRead(ctx context.Context, userID, postID string) (bool, error) +} + type TemporaryPostStore interface { InvalidateTemporaryPost(id string) Save(rctx request.CTX, post *model.TemporaryPost) (*model.TemporaryPost, error) diff --git a/server/channels/store/storetest/mocks/ReadTrackingStore.go b/server/channels/store/storetest/mocks/ReadTrackingStore.go new file mode 100644 index 00000000000..1bbe66da371 --- /dev/null +++ b/server/channels/store/storetest/mocks/ReadTrackingStore.go @@ -0,0 +1,95 @@ +// Code generated by mockery v2.53.4. DO NOT EDIT. + +// Regenerate this file using `make store-mocks`. + +package mocks + +import ( + context "context" + + model "github.com/mattermost/mattermost/server/public/model" + mock "github.com/stretchr/testify/mock" +) + +// ReadTrackingStore is an autogenerated mock type for the ReadTrackingStore type +type ReadTrackingStore struct { + mock.Mock +} + +// HasRead provides a mock function with given fields: ctx, userID, postID +func (_m *ReadTrackingStore) HasRead(ctx context.Context, userID string, postID string) (bool, error) { + ret := _m.Called(ctx, userID, postID) + + if len(ret) == 0 { + panic("no return value specified for HasRead") + } + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (bool, error)); ok { + return rf(ctx, userID, postID) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) bool); ok { + r0 = rf(ctx, userID, postID) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, userID, postID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Mark provides a mock function with given fields: ctx, userID, postID +func (_m *ReadTrackingStore) Mark(ctx context.Context, userID string, postID string) error { + ret := _m.Called(ctx, userID, postID) + + if len(ret) == 0 { + panic("no return value specified for Mark") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { + r0 = rf(ctx, userID, postID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MarkBulk provides a mock function with given fields: ctx, pairs +func (_m *ReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error { + ret := _m.Called(ctx, pairs) + + if len(ret) == 0 { + panic("no return value specified for MarkBulk") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []model.UserPostRead) error); ok { + r0 = rf(ctx, pairs) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewReadTrackingStore creates a new instance of ReadTrackingStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReadTrackingStore(t interface { + mock.TestingT + Cleanup(func()) +}) *ReadTrackingStore { + mock := &ReadTrackingStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/server/channels/store/storetest/mocks/Store.go b/server/channels/store/storetest/mocks/Store.go index 16ed34d912f..99427e891cd 100644 --- a/server/channels/store/storetest/mocks/Store.go +++ b/server/channels/store/storetest/mocks/Store.go @@ -516,6 +516,36 @@ func (_m *Store) GetDbVersion(numerical bool) (string, error) { return r0, r1 } +// GetDiagnostics provides a mock function with given fields: ctx +func (_m *Store) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetDiagnostics") + } + + var r0 *store.DatabaseDiagnostics + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*store.DatabaseDiagnostics, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *store.DatabaseDiagnostics); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*store.DatabaseDiagnostics) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetInternalMasterDB provides a mock function with no fields func (_m *Store) GetInternalMasterDB() *sql.DB { ret := _m.Called() @@ -614,36 +644,6 @@ func (_m *Store) GetSchemaDefinition() (*model.SupportPacketDatabaseSchema, erro return r0, r1 } -// GetDiagnostics provides a mock function with given fields: ctx -func (_m *Store) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for GetDiagnostics") - } - - var r0 *store.DatabaseDiagnostics - var r1 error - if rf, ok := ret.Get(0).(func(context.Context) (*store.DatabaseDiagnostics, error)); ok { - return rf(ctx) - } - if rf, ok := ret.Get(0).(func(context.Context) *store.DatabaseDiagnostics); ok { - r0 = rf(ctx) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*store.DatabaseDiagnostics) - } - } - - if rf, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = rf(ctx) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // Group provides a mock function with no fields func (_m *Store) Group() store.GroupStore { ret := _m.Called() @@ -1054,6 +1054,26 @@ func (_m *Store) ReadReceipt() store.ReadReceiptStore { return r0 } +// ReadTracking provides a mock function with no fields +func (_m *Store) ReadTracking() store.ReadTrackingStore { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ReadTracking") + } + + var r0 store.ReadTrackingStore + if rf, ok := ret.Get(0).(func() store.ReadTrackingStore); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(store.ReadTrackingStore) + } + } + + return r0 +} + // Recap provides a mock function with no fields func (_m *Store) Recap() store.RecapStore { ret := _m.Called() diff --git a/server/channels/store/storetest/mocks/UserAccessTokenStore.go b/server/channels/store/storetest/mocks/UserAccessTokenStore.go index 62fc003a65f..8ac6117e3db 100644 --- a/server/channels/store/storetest/mocks/UserAccessTokenStore.go +++ b/server/channels/store/storetest/mocks/UserAccessTokenStore.go @@ -78,36 +78,6 @@ func (_m *UserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) { return r0, r1 } -// GetExpiredBefore provides a mock function with given fields: cutoff, limit -func (_m *UserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { - ret := _m.Called(cutoff, limit) - - if len(ret) == 0 { - panic("no return value specified for GetExpiredBefore") - } - - var r0 []*model.UserAccessToken - var r1 error - if rf, ok := ret.Get(0).(func(int64, int) ([]*model.UserAccessToken, error)); ok { - return rf(cutoff, limit) - } - if rf, ok := ret.Get(0).(func(int64, int) []*model.UserAccessToken); ok { - r0 = rf(cutoff, limit) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]*model.UserAccessToken) - } - } - - if rf, ok := ret.Get(1).(func(int64, int) error); ok { - r1 = rf(cutoff, limit) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // Get provides a mock function with given fields: tokenID func (_m *UserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { ret := _m.Called(tokenID) @@ -228,6 +198,36 @@ func (_m *UserAccessTokenStore) GetByUser(userID string, page int, perPage int) return r0, r1 } +// GetExpiredBefore provides a mock function with given fields: cutoff, limit +func (_m *UserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { + ret := _m.Called(cutoff, limit) + + if len(ret) == 0 { + panic("no return value specified for GetExpiredBefore") + } + + var r0 []*model.UserAccessToken + var r1 error + if rf, ok := ret.Get(0).(func(int64, int) ([]*model.UserAccessToken, error)); ok { + return rf(cutoff, limit) + } + if rf, ok := ret.Get(0).(func(int64, int) []*model.UserAccessToken); ok { + r0 = rf(cutoff, limit) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*model.UserAccessToken) + } + } + + if rf, ok := ret.Get(1).(func(int64, int) error); ok { + r1 = rf(cutoff, limit) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Save provides a mock function with given fields: token func (_m *UserAccessTokenStore) Save(token *model.UserAccessToken) (*model.UserAccessToken, error) { ret := _m.Called(token) diff --git a/server/channels/store/storetest/store.go b/server/channels/store/storetest/store.go index 999754b46b8..a88c4da0c00 100644 --- a/server/channels/store/storetest/store.go +++ b/server/channels/store/storetest/store.go @@ -78,6 +78,7 @@ type Store struct { TemporaryPostStore mocks.TemporaryPostStore ViewStore mocks.ViewStore ChannelJoinRequestStore mocks.ChannelJoinRequestStore + ReadTrackingStore mocks.ReadTrackingStore } func (s *Store) Logger() mlog.LoggerIFace { return s.logger } @@ -187,6 +188,9 @@ func (s *Store) TemporaryPost() store.TemporaryPostStore { func (s *Store) ChannelJoinRequest() store.ChannelJoinRequestStore { return &s.ChannelJoinRequestStore } +func (s *Store) ReadTracking() store.ReadTrackingStore { + return &s.ReadTrackingStore +} func (s *Store) View() store.ViewStore { return &s.ViewStore } @@ -252,5 +256,6 @@ func (s *Store) AssertExpectations(t mock.TestingT) bool { &s.TemporaryPostStore, &s.ViewStore, &s.ChannelJoinRequestStore, + &s.ReadTrackingStore, ) } diff --git a/server/channels/store/timerlayer/timerlayer.go b/server/channels/store/timerlayer/timerlayer.go index 202bc98a7ce..4c67cd370db 100644 --- a/server/channels/store/timerlayer/timerlayer.go +++ b/server/channels/store/timerlayer/timerlayer.go @@ -57,6 +57,7 @@ type TimerLayer struct { PropertyValueStore store.PropertyValueStore ReactionStore store.ReactionStore ReadReceiptStore store.ReadReceiptStore + ReadTrackingStore store.ReadTrackingStore RecapStore store.RecapStore RemoteClusterStore store.RemoteClusterStore RetentionPolicyStore store.RetentionPolicyStore @@ -232,6 +233,10 @@ func (s *TimerLayer) ReadReceipt() store.ReadReceiptStore { return s.ReadReceiptStore } +func (s *TimerLayer) ReadTracking() store.ReadTrackingStore { + return s.ReadTrackingStore +} + func (s *TimerLayer) Recap() store.RecapStore { return s.RecapStore } @@ -506,6 +511,11 @@ type TimerLayerReadReceiptStore struct { Root *TimerLayer } +type TimerLayerReadTrackingStore struct { + store.ReadTrackingStore + Root *TimerLayer +} + type TimerLayerRecapStore struct { store.RecapStore Root *TimerLayer @@ -8959,6 +8969,54 @@ func (s *TimerLayerReadReceiptStore) Update(rctx request.CTX, receipt *model.Rea return result, err } +func (s *TimerLayerReadTrackingStore) HasRead(ctx context.Context, userID string, postID string) (bool, error) { + start := time.Now() + + result, err := s.ReadTrackingStore.HasRead(ctx, userID, postID) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("ReadTrackingStore.HasRead", success, elapsed) + } + return result, err +} + +func (s *TimerLayerReadTrackingStore) Mark(ctx context.Context, userID string, postID string) error { + start := time.Now() + + err := s.ReadTrackingStore.Mark(ctx, userID, postID) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("ReadTrackingStore.Mark", success, elapsed) + } + return err +} + +func (s *TimerLayerReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error { + start := time.Now() + + err := s.ReadTrackingStore.MarkBulk(ctx, pairs) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("ReadTrackingStore.MarkBulk", success, elapsed) + } + return err +} + func (s *TimerLayerRecapStore) DeleteRecap(id string) error { start := time.Now() @@ -14135,22 +14193,6 @@ func (s *TimerLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, return result, err } -func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { - start := time.Now() - - result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) - - elapsed := float64(time.Since(start)) / float64(time.Second) - if s.Root.Metrics != nil { - success := "false" - if err == nil { - success = "true" - } - s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed) - } - return result, err -} - func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) { start := time.Now() @@ -14215,6 +14257,22 @@ func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perP return result, err } +func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) { + start := time.Now() + + result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed) + } + return result, err +} + func (s *TimerLayerUserAccessTokenStore) Save(token *model.UserAccessToken) (*model.UserAccessToken, error) { start := time.Now() @@ -14853,6 +14911,22 @@ func (s *TimerLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook) return result, err } +func (s *TimerLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error { + start := time.Now() + + err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed) + + elapsed := float64(time.Since(start)) / float64(time.Second) + if s.Root.Metrics != nil { + success := "false" + if err == nil { + success = "true" + } + s.Root.Metrics.ObserveStoreMethodDuration("WebhookStore.UpdateIncomingLastUsed", success, elapsed) + } + return err +} + func (s *TimerLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) { start := time.Now() @@ -14897,10 +14971,6 @@ func (s *TimerLayer) TotalSearchDbConnections() int { return s.Store.TotalSearchDbConnections() } -func (s *TimerLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) { - return s.Store.GetDiagnostics(ctx) -} - func (s *TimerLayer) UnlockFromMaster() { s.Store.UnlockFromMaster() } @@ -14949,6 +15019,7 @@ func New(childStore store.Store, metrics einterfaces.MetricsInterface) *TimerLay newStore.PropertyValueStore = &TimerLayerPropertyValueStore{PropertyValueStore: childStore.PropertyValue(), Root: &newStore} newStore.ReactionStore = &TimerLayerReactionStore{ReactionStore: childStore.Reaction(), Root: &newStore} newStore.ReadReceiptStore = &TimerLayerReadReceiptStore{ReadReceiptStore: childStore.ReadReceipt(), Root: &newStore} + newStore.ReadTrackingStore = &TimerLayerReadTrackingStore{ReadTrackingStore: childStore.ReadTracking(), Root: &newStore} newStore.RecapStore = &TimerLayerRecapStore{RecapStore: childStore.Recap(), Root: &newStore} newStore.RemoteClusterStore = &TimerLayerRemoteClusterStore{RemoteClusterStore: childStore.RemoteCluster(), Root: &newStore} newStore.RetentionPolicyStore = &TimerLayerRetentionPolicyStore{RetentionPolicyStore: childStore.RetentionPolicy(), Root: &newStore} diff --git a/server/docker-compose.yaml b/server/docker-compose.yaml index d119f106bec..9ea13afca00 100644 --- a/server/docker-compose.yaml +++ b/server/docker-compose.yaml @@ -6,6 +6,13 @@ services: extends: file: build/docker-compose.common.yml service: postgres + postgres-readtracking: + container_name: mattermost-postgres-readtracking + ports: + - "5433:5432" + extends: + file: build/docker-compose.common.yml + service: postgres-readtracking minio: container_name: mattermost-minio ports: @@ -99,6 +106,7 @@ services: - mm-test depends_on: - postgres + - postgres-readtracking - minio - inbucket - openldap @@ -108,7 +116,7 @@ services: - grafana - loki - otel-collector - command: postgres:5432 minio:9000 inbucket:9001 openldap:389 elasticsearch:9200 opensearch:9201 prometheus:9090 grafana:3000 loki:3100 otel-collector:13133 + command: postgres:5432 postgres-readtracking:5432 minio:9000 inbucket:9001 openldap:389 elasticsearch:9200 opensearch:9201 prometheus:9090 grafana:3000 loki:3100 otel-collector:13133 leader: build: diff --git a/server/public/model/config.go b/server/public/model/config.go index d975f59c46b..8fc021af58e 100644 --- a/server/public/model/config.go +++ b/server/public/model/config.go @@ -1577,6 +1577,72 @@ func (s *SqlSettings) SetDefaults(isUpdate bool) { } } +// ReadTrackingSettings configures the independent PostgreSQL pool used by +// the read-tracking sub-store. The pool, credentials, and migration set are +// all separate from SqlSettings; a failure in this DB does not affect the +// main DB. Disabled by default — the sub-store is a no-op when Enable=false. +type ReadTrackingSettings struct { + Enable *bool `access:"environment_database,write_restrictable,cloud_restrictable"` + DataSource *string `access:"environment_database,write_restrictable,cloud_restrictable"` // telemetry: none + MaxIdleConns *int `access:"environment_database,write_restrictable,cloud_restrictable"` + MaxOpenConns *int `access:"environment_database,write_restrictable,cloud_restrictable"` + ConnMaxLifetimeMilliseconds *int `access:"environment_database,write_restrictable,cloud_restrictable"` + ConnMaxIdleTimeMilliseconds *int `access:"environment_database,write_restrictable,cloud_restrictable"` + QueryTimeout *int `access:"environment_database,write_restrictable,cloud_restrictable"` +} + +const ReadTrackingSettingsDefaultDataSource = "postgres://mmuser:mostest@localhost:5433/mattermost_readtracking?sslmode=disable&connect_timeout=10" + +func (s *ReadTrackingSettings) SetDefaults() { + if s.Enable == nil { + s.Enable = new(false) + } + if s.DataSource == nil { + s.DataSource = new(ReadTrackingSettingsDefaultDataSource) + } + if s.MaxIdleConns == nil { + s.MaxIdleConns = new(50) + } + if s.MaxOpenConns == nil { + // Higher than main pool: write-heavy workload, more concurrent inserts. + s.MaxOpenConns = new(200) + } + if s.ConnMaxLifetimeMilliseconds == nil { + s.ConnMaxLifetimeMilliseconds = new(3600000) + } + if s.ConnMaxIdleTimeMilliseconds == nil { + s.ConnMaxIdleTimeMilliseconds = new(300000) + } + if s.QueryTimeout == nil { + s.QueryTimeout = new(30) + } +} + +func (s *ReadTrackingSettings) isValid() *AppError { + if !*s.Enable { + return nil + } + if *s.DataSource == "" { + return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_data_src.app_error", nil, "", http.StatusBadRequest) + } + if *s.MaxIdleConns <= 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_idle.app_error", nil, "", http.StatusBadRequest) + } + if *s.MaxOpenConns <= 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_max_conn.app_error", nil, "", http.StatusBadRequest) + } + if *s.ConnMaxLifetimeMilliseconds < 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_conn_max_lifetime_milliseconds.app_error", nil, "", http.StatusBadRequest) + } + if *s.ConnMaxIdleTimeMilliseconds < 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_conn_max_idle_time_milliseconds.app_error", nil, "", http.StatusBadRequest) + } + if *s.QueryTimeout <= 0 { + return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_query_timeout.app_error", nil, "", http.StatusBadRequest) + } + return nil +} + type LogSettings struct { EnableConsole *bool `access:"environment_logging,write_restrictable,cloud_restrictable"` ConsoleLevel *string `access:"environment_logging,write_restrictable,cloud_restrictable"` @@ -4107,6 +4173,7 @@ type Config struct { TeamSettings TeamSettings ClientRequirements ClientRequirements SqlSettings SqlSettings + ReadTrackingSettings ReadTrackingSettings LogSettings LogSettings ExperimentalAuditSettings ExperimentalAuditSettings PasswordSettings PasswordSettings @@ -4223,6 +4290,7 @@ func (o *Config) SetDefaults() { } o.SqlSettings.SetDefaults(isUpdate) + o.ReadTrackingSettings.SetDefaults() o.FileSettings.SetDefaults(isUpdate) o.EmailSettings.SetDefaults(isUpdate) o.PrivacySettings.setDefaults() @@ -4305,6 +4373,10 @@ func (o *Config) IsValid() *AppError { return appErr } + if appErr := o.ReadTrackingSettings.isValid(); appErr != nil { + return appErr + } + if appErr := o.FileSettings.isValid(); appErr != nil { return appErr } @@ -5259,6 +5331,10 @@ func (o *Config) Sanitize(pluginManifests []*Manifest, opts *SanitizeOptions) { *o.SqlSettings.AtRestEncryptKey = FakeSetting } + if o.ReadTrackingSettings.DataSource != nil { + *o.ReadTrackingSettings.DataSource = sanitizeDataSourceField(*o.ReadTrackingSettings.DataSource, "ReadTrackingSettings.DataSource") + } + if o.ElasticsearchSettings.Password != nil { *o.ElasticsearchSettings.Password = FakeSetting } diff --git a/server/public/model/user_post_read.go b/server/public/model/user_post_read.go new file mode 100644 index 00000000000..87e93c46534 --- /dev/null +++ b/server/public/model/user_post_read.go @@ -0,0 +1,10 @@ +// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. +// See LICENSE.txt for license information. + +package model + +type UserPostRead struct { + UserID string `json:"user_id"` + PostID string `json:"post_id"` + CreatedAt int64 `json:"created_at"` +} diff --git a/server/public/utils/sql/sql_utils.go b/server/public/utils/sql/sql_utils.go index bb847517f81..c1aad65a1bd 100644 --- a/server/public/utils/sql/sql_utils.go +++ b/server/public/utils/sql/sql_utils.go @@ -24,13 +24,30 @@ const ( // SetupConnection sets up the connection to the database and pings it to make sure it's alive. // It also applies any database configuration settings that are required. func SetupConnection(logger mlog.LoggerIFace, connType string, dataSource string, settings *model.SqlSettings, attempts int) (*dbsql.DB, error) { - db, err := dbsql.Open(*settings.DriverName, dataSource) + return setupConnection(logger, connType, dataSource, *settings.DriverName, + *settings.MaxIdleConns, *settings.MaxOpenConns, + *settings.ConnMaxLifetimeMilliseconds, *settings.ConnMaxIdleTimeMilliseconds, + attempts) +} + +// SetupReadTrackingConnection opens a connection for the independent +// read-tracking pool. Mirrors SetupConnection but reads pool tuning from +// ReadTrackingSettings (driver is fixed to postgres for this DB). +func SetupReadTrackingConnection(logger mlog.LoggerIFace, connType string, settings *model.ReadTrackingSettings, attempts int) (*dbsql.DB, error) { + return setupConnection(logger, connType, *settings.DataSource, model.DatabaseDriverPostgres, + *settings.MaxIdleConns, *settings.MaxOpenConns, + *settings.ConnMaxLifetimeMilliseconds, *settings.ConnMaxIdleTimeMilliseconds, + attempts) +} + +func setupConnection(logger mlog.LoggerIFace, connType, dataSource, driverName string, maxIdle, maxOpen, lifetimeMs, idleMs, attempts int) (*dbsql.DB, error) { + db, err := dbsql.Open(driverName, dataSource) if err != nil { return nil, errors.Wrap(err, "failed to open SQL connection") } // At this point, we have passed sql.Open, so we deliberately ignore any errors. - sanitized, _ := model.SanitizeDataSource(*settings.DriverName, dataSource) + sanitized, _ := model.SanitizeDataSource(driverName, dataSource) logger = logger.With( mlog.String("database", connType), @@ -66,11 +83,11 @@ func SetupConnection(logger mlog.LoggerIFace, connType string, dataSource string db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) } else { - db.SetMaxIdleConns(*settings.MaxIdleConns) - db.SetMaxOpenConns(*settings.MaxOpenConns) + db.SetMaxIdleConns(maxIdle) + db.SetMaxOpenConns(maxOpen) } - db.SetConnMaxLifetime(time.Duration(*settings.ConnMaxLifetimeMilliseconds) * time.Millisecond) - db.SetConnMaxIdleTime(time.Duration(*settings.ConnMaxIdleTimeMilliseconds) * time.Millisecond) + db.SetConnMaxLifetime(time.Duration(lifetimeMs) * time.Millisecond) + db.SetConnMaxIdleTime(time.Duration(idleMs) * time.Millisecond) return db, nil }