This commit is contained in:
Harshil Sharma 2026-05-25 11:31:44 +05:30
parent cfafefe58c
commit 4ff072e4e0
19 changed files with 811 additions and 134 deletions

View file

@ -24,6 +24,25 @@ services:
interval: 5s
timeout: 10s
retries: 3
postgres-readtracking:
image: "postgres:14"
logging: *default-logging
restart: always
networks:
- mm-test
environment:
POSTGRES_USER: mmuser
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-mostest}
POSTGRES_DB: mattermost_readtracking
POSTGRES_INITDB_ARGS: "--auth-host=scram-sha-256 --auth-local=scram-sha-256"
command: postgres -c 'config_file=/etc/postgresql/postgresql.conf'
volumes:
- "./docker/postgres-readtracking.conf:/etc/postgresql/postgresql.conf:Z"
healthcheck:
test: [ "CMD", "pg_isready", "-h", "localhost" ]
interval: 5s
timeout: 10s
retries: 3
minio:
image: "minio/minio:RELEASE.2024-06-22T05-26-45Z"
logging: *default-logging

View file

@ -0,0 +1,18 @@
# Postgres config for the read-tracking database.
#
# This database stores an append-only event log (user_post_reads) using
# UNLOGGED tables. Durability is intentionally relaxed in favor of write
# throughput; data is acceptable-to-lose on crash.
max_connections = 500
listen_addresses = '*'
fsync = off
full_page_writes = off
synchronous_commit = off
default_text_search_config = 'pg_catalog.english'
commit_delay = 1000
commit_siblings = 5
wal_writer_delay = 1000ms
checkpoint_timeout = 30min
max_wal_size = 4GB
logging_collector = off
password_encryption = 'scram-sha-256'

View file

@ -281,7 +281,7 @@ func New(sc ServiceConfig, options ...Option) (*PlatformService, error) {
// Cache layer
opts := append(ps.storeOptions, sqlstore.WithFeatureFlags(func() *model.FeatureFlags {
return ps.Config().FeatureFlags
}))
}), sqlstore.WithReadTrackingSettings(ps.Config().ReadTrackingSettings))
ps.sqlStore, err = sqlstore.New(ps.Config().SqlSettings, ps.Log(), ps.metricsIFace, opts...)
if err != nil {
return nil, err

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS user_post_reads;

View file

@ -0,0 +1,5 @@
CREATE UNLOGGED TABLE IF NOT EXISTS user_post_reads (
user_id VARCHAR(26) NOT NULL,
post_id VARCHAR(26) NOT NULL,
created_at BIGINT NOT NULL
);

View file

@ -58,6 +58,7 @@ type RetryLayer struct {
PropertyValueStore store.PropertyValueStore
ReactionStore store.ReactionStore
ReadReceiptStore store.ReadReceiptStore
ReadTrackingStore store.ReadTrackingStore
RecapStore store.RecapStore
RemoteClusterStore store.RemoteClusterStore
RetentionPolicyStore store.RetentionPolicyStore
@ -233,6 +234,10 @@ func (s *RetryLayer) ReadReceipt() store.ReadReceiptStore {
return s.ReadReceiptStore
}
func (s *RetryLayer) ReadTracking() store.ReadTrackingStore {
return s.ReadTrackingStore
}
func (s *RetryLayer) Recap() store.RecapStore {
return s.RecapStore
}
@ -507,6 +512,11 @@ type RetryLayerReadReceiptStore struct {
Root *RetryLayer
}
type RetryLayerReadTrackingStore struct {
store.ReadTrackingStore
Root *RetryLayer
}
type RetryLayerRecapStore struct {
store.RecapStore
Root *RetryLayer
@ -11261,6 +11271,69 @@ func (s *RetryLayerReadReceiptStore) Update(rctx request.CTX, receipt *model.Rea
}
func (s *RetryLayerReadTrackingStore) HasRead(ctx context.Context, userID string, postID string) (bool, error) {
tries := 0
for {
result, err := s.ReadTrackingStore.HasRead(ctx, userID, postID)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerReadTrackingStore) Mark(ctx context.Context, userID string, postID string) error {
tries := 0
for {
err := s.ReadTrackingStore.Mark(ctx, userID, postID)
if err == nil {
return nil
}
if !isRepeatableError(err) {
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error {
tries := 0
for {
err := s.ReadTrackingStore.MarkBulk(ctx, pairs)
if err == nil {
return nil
}
if !isRepeatableError(err) {
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerRecapStore) DeleteRecap(id string) error {
tries := 0
@ -17843,48 +17916,6 @@ func (s *RetryLayerUserAccessTokenStore) Delete(tokenID string) error {
}
func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) {
tries := 0
for {
result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) {
tries := 0
for {
result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error {
tries := 0
@ -17906,6 +17937,27 @@ func (s *RetryLayerUserAccessTokenStore) DeleteAllForUser(userID string) error {
}
func (s *RetryLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) {
tries := 0
for {
result, err := s.UserAccessTokenStore.DeleteByIds(tokenIDs)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) {
tries := 0
@ -17990,6 +18042,27 @@ func (s *RetryLayerUserAccessTokenStore) GetByUser(userID string, page int, perP
}
func (s *RetryLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) {
tries := 0
for {
result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit)
if err == nil {
return result, nil
}
if !isRepeatableError(err) {
return result, err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return result, err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerUserAccessTokenStore) Save(token *model.UserAccessToken) (*model.UserAccessToken, error) {
tries := 0
@ -18800,6 +18873,27 @@ func (s *RetryLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook)
}
func (s *RetryLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error {
tries := 0
for {
err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed)
if err == nil {
return nil
}
if !isRepeatableError(err) {
return err
}
tries++
if tries >= 3 {
err = errors.Wrap(err, "giving up after 3 consecutive repeatable transaction failures")
return err
}
timepkg.Sleep(100 * timepkg.Millisecond)
}
}
func (s *RetryLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) {
tries := 0
@ -18849,10 +18943,6 @@ func (s *RetryLayer) TotalSearchDbConnections() int {
return s.Store.TotalSearchDbConnections()
}
func (s *RetryLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) {
return s.Store.GetDiagnostics(ctx)
}
func (s *RetryLayer) UnlockFromMaster() {
s.Store.UnlockFromMaster()
}
@ -18900,6 +18990,7 @@ func New(childStore store.Store) *RetryLayer {
newStore.PropertyValueStore = &RetryLayerPropertyValueStore{PropertyValueStore: childStore.PropertyValue(), Root: &newStore}
newStore.ReactionStore = &RetryLayerReactionStore{ReactionStore: childStore.Reaction(), Root: &newStore}
newStore.ReadReceiptStore = &RetryLayerReadReceiptStore{ReadReceiptStore: childStore.ReadReceipt(), Root: &newStore}
newStore.ReadTrackingStore = &RetryLayerReadTrackingStore{ReadTrackingStore: childStore.ReadTracking(), Root: &newStore}
newStore.RecapStore = &RetryLayerRecapStore{RecapStore: childStore.Recap(), Root: &newStore}
newStore.RemoteClusterStore = &RetryLayerRemoteClusterStore{RemoteClusterStore: childStore.RemoteCluster(), Root: &newStore}
newStore.RetentionPolicyStore = &RetryLayerRetentionPolicyStore{RetentionPolicyStore: childStore.RetentionPolicy(), Root: &newStore}

View file

@ -153,6 +153,73 @@ func (ss *SqlStore) migrate(direction migrationDirection, dryRun, enableMorphLog
}
}
// readTrackingMigrationsDir is the embed-relative directory holding the
// migrations applied to the independent read-tracking database.
const readTrackingMigrationsDir = "postgres_readtracking"
func (ss *SqlStore) initReadTrackingMorph(enableLogging bool) (*morph.Morph, error) {
assets := db.Assets()
assetsList, err := assets.ReadDir(path.Join("migrations", readTrackingMigrationsDir))
if err != nil {
return nil, err
}
assetNamesForDriver := make([]string, len(assetsList))
for i, entry := range assetsList {
assetNamesForDriver[i] = entry.Name()
}
src, err := mbindata.WithInstance(&mbindata.AssetSource{
Names: assetNamesForDriver,
AssetFunc: func(name string) ([]byte, error) {
return assets.ReadFile(path.Join("migrations", readTrackingMigrationsDir, name))
},
})
if err != nil {
return nil, err
}
driver, err := ps.WithInstance(ss.readTrackingX.DB().DB)
if err != nil {
return nil, err
}
var logWriter io.Writer
if enableLogging {
logWriter = &morphWriter{}
} else {
logWriter = io.Discard
}
// The read-tracking schema is independent — use its own advisory lock key
// so it doesn't contend with the main DB's migration lock.
opts := []morph.EngineOption{
morph.WithLogger(log.New(logWriter, "", log.Lshortfile)),
morph.WithLock("mm-readtracking-lock-key"),
morph.SetStatementTimeoutInSeconds(*ss.settings.MigrationsStatementTimeoutSeconds),
morph.SetDryRun(false),
}
return morph.New(context.Background(), driver, src, opts...)
}
func (ss *SqlStore) migrateReadTracking(direction migrationDirection, enableMorphLogging bool) error {
engine, err := ss.initReadTrackingMorph(enableMorphLogging)
if err != nil {
return err
}
defer engine.Close()
switch direction {
case migrationsDirectionDown:
_, err = engine.ApplyDown(-1)
return err
default:
return engine.ApplyAll()
}
}
func (m *Migrator) GeneratePlan(shouldRecover bool) (*models.Plan, error) {
diff, err := m.engine.Diff(models.Up)
if err != nil {

View file

@ -0,0 +1,117 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package sqlstore
import (
"context"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/v8/channels/store"
)
const readTrackingTableName = "user_post_reads"
// SqlReadTrackingStore writes user-post read events to an independent
// Postgres pool. The backing table is UNLOGGED — no WAL, no replication,
// truncated on crash — and has no unique index, so duplicates are allowed.
// Callers dedupe on read.
type SqlReadTrackingStore struct {
*SqlStore
}
// noopReadTrackingStore is returned when ReadTrackingSettings.Enable=false,
// so callers don't have to nil-check the store before calling Mark/MarkBulk.
type noopReadTrackingStore struct{}
func (noopReadTrackingStore) Mark(context.Context, string, string) error {
return nil
}
func (noopReadTrackingStore) MarkBulk(context.Context, []model.UserPostRead) error {
return nil
}
func (noopReadTrackingStore) HasRead(context.Context, string, string) (bool, error) {
return false, nil
}
func newSqlReadTrackingStore(s *SqlStore) store.ReadTrackingStore {
if s.readTrackingX == nil {
return noopReadTrackingStore{}
}
return &SqlReadTrackingStore{SqlStore: s}
}
// Mark appends a single user-post read event.
func (s *SqlReadTrackingStore) Mark(ctx context.Context, userID, postID string) error {
_, err := s.readTrackingX.ExecContext(ctx,
`INSERT INTO `+readTrackingTableName+` (user_id, post_id, created_at) VALUES ($1, $2, $3)`,
userID, postID, model.GetMillis())
if err != nil {
return errors.Wrap(err, "failed to mark user-post read")
}
return nil
}
// MarkBulk appends a batch of read events using Postgres' binary COPY
// protocol (lib/pq's pq.CopyIn). Orders of magnitude faster than batched
// multi-VALUES INSERT for large batches.
func (s *SqlReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error {
if len(pairs) == 0 {
return nil
}
tx, err := s.readTrackingX.DB().BeginTx(ctx, nil)
if err != nil {
return errors.Wrap(err, "failed to begin tx for bulk mark")
}
stmt, err := tx.PrepareContext(ctx, pq.CopyIn(readTrackingTableName, "user_id", "post_id", "created_at"))
if err != nil {
_ = tx.Rollback()
return errors.Wrap(err, "failed to prepare COPY")
}
now := model.GetMillis()
for _, p := range pairs {
ts := p.CreatedAt
if ts == 0 {
ts = now
}
if _, err = stmt.ExecContext(ctx, p.UserID, p.PostID, ts); err != nil {
_ = stmt.Close()
_ = tx.Rollback()
return errors.Wrap(err, "failed to append row to COPY")
}
}
// Flush — required by pq.CopyIn semantics before Close.
if _, err = stmt.ExecContext(ctx); err != nil {
_ = stmt.Close()
_ = tx.Rollback()
return errors.Wrap(err, "failed to flush COPY")
}
if err = stmt.Close(); err != nil {
_ = tx.Rollback()
return errors.Wrap(err, "failed to close COPY statement")
}
if err = tx.Commit(); err != nil {
return errors.Wrap(err, "failed to commit bulk mark")
}
return nil
}
// HasRead returns true if at least one read event exists for the pair.
// Duplicates are ignored — EXISTS short-circuits on the first match.
func (s *SqlReadTrackingStore) HasRead(ctx context.Context, userID, postID string) (bool, error) {
var exists bool
err := s.readTrackingX.DB().GetContext(ctx, &exists,
`SELECT EXISTS(SELECT 1 FROM `+readTrackingTableName+` WHERE user_id=$1 AND post_id=$2 LIMIT 1)`,
userID, postID)
if err != nil {
return false, errors.Wrap(err, "failed to query read state")
}
return exists, nil
}

View file

@ -119,6 +119,7 @@ type SqlStoreStores struct {
readReceipt store.ReadReceiptStore
temporaryPost store.TemporaryPostStore
channelJoinRequest store.ChannelJoinRequestStore
readTracking store.ReadTrackingStore
}
type SqlStore struct {
@ -134,6 +135,12 @@ type SqlStore struct {
searchReplicaXs []*atomic.Pointer[sqlxDBWrapper]
replicaLagHandles []*sql.DB
// readTrackingX is an independent pool for the read-tracking DB. Nil when
// ReadTrackingSettings.Enable is false.
readTrackingX *sqlxDBWrapper
rtSettings *model.ReadTrackingSettings
stores SqlStoreStores
settings *model.SqlSettings
lockedToMaster bool
@ -186,6 +193,15 @@ func WithFeatureFlags(fn func() *model.FeatureFlags) Option {
}
}
// WithReadTrackingSettings configures the independent read-tracking pool.
// If omitted (or Enable=false), Store().ReadTracking() returns a no-op store.
func WithReadTrackingSettings(rt model.ReadTrackingSettings) Option {
return func(s *SqlStore) error {
s.rtSettings = &rt
return nil
}
}
// getFeatureFlags returns the current feature flags, or defaults if no function was configured.
func (ss *SqlStore) getFeatureFlags() *model.FeatureFlags {
if ss.featureFlagsFn != nil {
@ -236,6 +252,13 @@ func New(settings model.SqlSettings, logger mlog.LoggerIFace, metrics einterface
if err != nil {
return nil, errors.Wrap(err, "failed to apply database migrations")
}
if store.readTrackingX != nil {
err = store.migrateReadTracking(migrationsDirectionUp, !store.disableMorphLogging)
if err != nil {
return nil, errors.Wrap(err, "failed to apply read-tracking database migrations")
}
}
}
store.isBinaryParam, err = store.computeBinaryParam()
@ -307,6 +330,7 @@ func New(settings model.SqlSettings, logger mlog.LoggerIFace, metrics einterface
store.stores.readReceipt = newSqlReadReceiptStore(store, metrics)
store.stores.temporaryPost = newSqlTemporaryPostStore(store, metrics)
store.stores.channelJoinRequest = newSqlChannelJoinRequestStore(store)
store.stores.readTracking = newSqlReadTrackingStore(store)
store.stores.preference.(*SqlPreferenceStore).deleteUnusedFeatures()
@ -375,6 +399,20 @@ func (ss *SqlStore) initConnection() error {
ss.replicaLagHandles = append(ss.replicaLagHandles, replicaLagHandle)
}
}
if ss.rtSettings != nil && ss.rtSettings.Enable != nil && *ss.rtSettings.Enable {
rtHandle, err := sqlUtils.SetupReadTrackingConnection(ss.Logger(), "read-tracking", ss.rtSettings, DBPingAttempts)
if err != nil {
return errors.Wrap(err, "failed to setup read-tracking connection")
}
ss.readTrackingX = newSqlxDBWrapper(sqlx.NewDb(rtHandle, model.DatabaseDriverPostgres),
time.Duration(*ss.rtSettings.QueryTimeout)*time.Second,
*ss.settings.Trace)
if ss.metrics != nil {
ss.metrics.RegisterDBCollector(ss.readTrackingX.DB().DB, "read-tracking")
}
}
return nil
}
@ -725,6 +763,10 @@ func (ss *SqlStore) Close() {
for _, replica := range ss.replicaLagHandles {
replica.Close()
}
if ss.readTrackingX != nil {
ss.readTrackingX.Close()
}
}
func (ss *SqlStore) LockToMaster() {
@ -963,6 +1005,10 @@ func (ss *SqlStore) ChannelJoinRequest() store.ChannelJoinRequestStore {
return ss.stores.channelJoinRequest
}
func (ss *SqlStore) ReadTracking() store.ReadTrackingStore {
return ss.stores.readTracking
}
func (ss *SqlStore) DropAllTables() {
ss.masterX.Exec(`DO
$func$

View file

@ -105,6 +105,7 @@ type Store interface {
ReadReceipt() ReadReceiptStore
TemporaryPost() TemporaryPostStore
ChannelJoinRequest() ChannelJoinRequestStore
ReadTracking() ReadTrackingStore
}
type RetentionPolicyStore interface {
@ -1274,6 +1275,16 @@ type ReadReceiptStore interface {
GetUnreadCountForPost(rctx request.CTX, post *model.Post) (int64, error)
}
// ReadTrackingStore appends to an UNLOGGED user_post_reads table on a
// separate Postgres pool. Writes are intentionally fire-then-fail-fast: no
// retry layer, no cache layer. Duplicates are allowed at write time and
// deduped on read.
type ReadTrackingStore interface {
Mark(ctx context.Context, userID, postID string) error
MarkBulk(ctx context.Context, pairs []model.UserPostRead) error
HasRead(ctx context.Context, userID, postID string) (bool, error)
}
type TemporaryPostStore interface {
InvalidateTemporaryPost(id string)
Save(rctx request.CTX, post *model.TemporaryPost) (*model.TemporaryPost, error)

View file

@ -0,0 +1,95 @@
// Code generated by mockery v2.53.4. DO NOT EDIT.
// Regenerate this file using `make store-mocks`.
package mocks
import (
context "context"
model "github.com/mattermost/mattermost/server/public/model"
mock "github.com/stretchr/testify/mock"
)
// ReadTrackingStore is an autogenerated mock type for the ReadTrackingStore type
type ReadTrackingStore struct {
mock.Mock
}
// HasRead provides a mock function with given fields: ctx, userID, postID
func (_m *ReadTrackingStore) HasRead(ctx context.Context, userID string, postID string) (bool, error) {
ret := _m.Called(ctx, userID, postID)
if len(ret) == 0 {
panic("no return value specified for HasRead")
}
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) (bool, error)); ok {
return rf(ctx, userID, postID)
}
if rf, ok := ret.Get(0).(func(context.Context, string, string) bool); ok {
r0 = rf(ctx, userID, postID)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok {
r1 = rf(ctx, userID, postID)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Mark provides a mock function with given fields: ctx, userID, postID
func (_m *ReadTrackingStore) Mark(ctx context.Context, userID string, postID string) error {
ret := _m.Called(ctx, userID, postID)
if len(ret) == 0 {
panic("no return value specified for Mark")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok {
r0 = rf(ctx, userID, postID)
} else {
r0 = ret.Error(0)
}
return r0
}
// MarkBulk provides a mock function with given fields: ctx, pairs
func (_m *ReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error {
ret := _m.Called(ctx, pairs)
if len(ret) == 0 {
panic("no return value specified for MarkBulk")
}
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []model.UserPostRead) error); ok {
r0 = rf(ctx, pairs)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewReadTrackingStore creates a new instance of ReadTrackingStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewReadTrackingStore(t interface {
mock.TestingT
Cleanup(func())
}) *ReadTrackingStore {
mock := &ReadTrackingStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View file

@ -516,6 +516,36 @@ func (_m *Store) GetDbVersion(numerical bool) (string, error) {
return r0, r1
}
// GetDiagnostics provides a mock function with given fields: ctx
func (_m *Store) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for GetDiagnostics")
}
var r0 *store.DatabaseDiagnostics
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*store.DatabaseDiagnostics, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *store.DatabaseDiagnostics); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*store.DatabaseDiagnostics)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetInternalMasterDB provides a mock function with no fields
func (_m *Store) GetInternalMasterDB() *sql.DB {
ret := _m.Called()
@ -614,36 +644,6 @@ func (_m *Store) GetSchemaDefinition() (*model.SupportPacketDatabaseSchema, erro
return r0, r1
}
// GetDiagnostics provides a mock function with given fields: ctx
func (_m *Store) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) {
ret := _m.Called(ctx)
if len(ret) == 0 {
panic("no return value specified for GetDiagnostics")
}
var r0 *store.DatabaseDiagnostics
var r1 error
if rf, ok := ret.Get(0).(func(context.Context) (*store.DatabaseDiagnostics, error)); ok {
return rf(ctx)
}
if rf, ok := ret.Get(0).(func(context.Context) *store.DatabaseDiagnostics); ok {
r0 = rf(ctx)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*store.DatabaseDiagnostics)
}
}
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(ctx)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Group provides a mock function with no fields
func (_m *Store) Group() store.GroupStore {
ret := _m.Called()
@ -1054,6 +1054,26 @@ func (_m *Store) ReadReceipt() store.ReadReceiptStore {
return r0
}
// ReadTracking provides a mock function with no fields
func (_m *Store) ReadTracking() store.ReadTrackingStore {
ret := _m.Called()
if len(ret) == 0 {
panic("no return value specified for ReadTracking")
}
var r0 store.ReadTrackingStore
if rf, ok := ret.Get(0).(func() store.ReadTrackingStore); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(store.ReadTrackingStore)
}
}
return r0
}
// Recap provides a mock function with no fields
func (_m *Store) Recap() store.RecapStore {
ret := _m.Called()

View file

@ -78,36 +78,6 @@ func (_m *UserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64, error) {
return r0, r1
}
// GetExpiredBefore provides a mock function with given fields: cutoff, limit
func (_m *UserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) {
ret := _m.Called(cutoff, limit)
if len(ret) == 0 {
panic("no return value specified for GetExpiredBefore")
}
var r0 []*model.UserAccessToken
var r1 error
if rf, ok := ret.Get(0).(func(int64, int) ([]*model.UserAccessToken, error)); ok {
return rf(cutoff, limit)
}
if rf, ok := ret.Get(0).(func(int64, int) []*model.UserAccessToken); ok {
r0 = rf(cutoff, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*model.UserAccessToken)
}
}
if rf, ok := ret.Get(1).(func(int64, int) error); ok {
r1 = rf(cutoff, limit)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Get provides a mock function with given fields: tokenID
func (_m *UserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) {
ret := _m.Called(tokenID)
@ -228,6 +198,36 @@ func (_m *UserAccessTokenStore) GetByUser(userID string, page int, perPage int)
return r0, r1
}
// GetExpiredBefore provides a mock function with given fields: cutoff, limit
func (_m *UserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) {
ret := _m.Called(cutoff, limit)
if len(ret) == 0 {
panic("no return value specified for GetExpiredBefore")
}
var r0 []*model.UserAccessToken
var r1 error
if rf, ok := ret.Get(0).(func(int64, int) ([]*model.UserAccessToken, error)); ok {
return rf(cutoff, limit)
}
if rf, ok := ret.Get(0).(func(int64, int) []*model.UserAccessToken); ok {
r0 = rf(cutoff, limit)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*model.UserAccessToken)
}
}
if rf, ok := ret.Get(1).(func(int64, int) error); ok {
r1 = rf(cutoff, limit)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Save provides a mock function with given fields: token
func (_m *UserAccessTokenStore) Save(token *model.UserAccessToken) (*model.UserAccessToken, error) {
ret := _m.Called(token)

View file

@ -78,6 +78,7 @@ type Store struct {
TemporaryPostStore mocks.TemporaryPostStore
ViewStore mocks.ViewStore
ChannelJoinRequestStore mocks.ChannelJoinRequestStore
ReadTrackingStore mocks.ReadTrackingStore
}
func (s *Store) Logger() mlog.LoggerIFace { return s.logger }
@ -187,6 +188,9 @@ func (s *Store) TemporaryPost() store.TemporaryPostStore {
func (s *Store) ChannelJoinRequest() store.ChannelJoinRequestStore {
return &s.ChannelJoinRequestStore
}
func (s *Store) ReadTracking() store.ReadTrackingStore {
return &s.ReadTrackingStore
}
func (s *Store) View() store.ViewStore {
return &s.ViewStore
}
@ -252,5 +256,6 @@ func (s *Store) AssertExpectations(t mock.TestingT) bool {
&s.TemporaryPostStore,
&s.ViewStore,
&s.ChannelJoinRequestStore,
&s.ReadTrackingStore,
)
}

View file

@ -57,6 +57,7 @@ type TimerLayer struct {
PropertyValueStore store.PropertyValueStore
ReactionStore store.ReactionStore
ReadReceiptStore store.ReadReceiptStore
ReadTrackingStore store.ReadTrackingStore
RecapStore store.RecapStore
RemoteClusterStore store.RemoteClusterStore
RetentionPolicyStore store.RetentionPolicyStore
@ -232,6 +233,10 @@ func (s *TimerLayer) ReadReceipt() store.ReadReceiptStore {
return s.ReadReceiptStore
}
func (s *TimerLayer) ReadTracking() store.ReadTrackingStore {
return s.ReadTrackingStore
}
func (s *TimerLayer) Recap() store.RecapStore {
return s.RecapStore
}
@ -506,6 +511,11 @@ type TimerLayerReadReceiptStore struct {
Root *TimerLayer
}
type TimerLayerReadTrackingStore struct {
store.ReadTrackingStore
Root *TimerLayer
}
type TimerLayerRecapStore struct {
store.RecapStore
Root *TimerLayer
@ -8959,6 +8969,54 @@ func (s *TimerLayerReadReceiptStore) Update(rctx request.CTX, receipt *model.Rea
return result, err
}
func (s *TimerLayerReadTrackingStore) HasRead(ctx context.Context, userID string, postID string) (bool, error) {
start := time.Now()
result, err := s.ReadTrackingStore.HasRead(ctx, userID, postID)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("ReadTrackingStore.HasRead", success, elapsed)
}
return result, err
}
func (s *TimerLayerReadTrackingStore) Mark(ctx context.Context, userID string, postID string) error {
start := time.Now()
err := s.ReadTrackingStore.Mark(ctx, userID, postID)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("ReadTrackingStore.Mark", success, elapsed)
}
return err
}
func (s *TimerLayerReadTrackingStore) MarkBulk(ctx context.Context, pairs []model.UserPostRead) error {
start := time.Now()
err := s.ReadTrackingStore.MarkBulk(ctx, pairs)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("ReadTrackingStore.MarkBulk", success, elapsed)
}
return err
}
func (s *TimerLayerRecapStore) DeleteRecap(id string) error {
start := time.Now()
@ -14135,22 +14193,6 @@ func (s *TimerLayerUserAccessTokenStore) DeleteByIds(tokenIDs []string) (int64,
return result, err
}
func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) {
start := time.Now()
result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed)
}
return result, err
}
func (s *TimerLayerUserAccessTokenStore) Get(tokenID string) (*model.UserAccessToken, error) {
start := time.Now()
@ -14215,6 +14257,22 @@ func (s *TimerLayerUserAccessTokenStore) GetByUser(userID string, page int, perP
return result, err
}
func (s *TimerLayerUserAccessTokenStore) GetExpiredBefore(cutoff int64, limit int) ([]*model.UserAccessToken, error) {
start := time.Now()
result, err := s.UserAccessTokenStore.GetExpiredBefore(cutoff, limit)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("UserAccessTokenStore.GetExpiredBefore", success, elapsed)
}
return result, err
}
func (s *TimerLayerUserAccessTokenStore) Save(token *model.UserAccessToken) (*model.UserAccessToken, error) {
start := time.Now()
@ -14853,6 +14911,22 @@ func (s *TimerLayerWebhookStore) UpdateIncoming(webhook *model.IncomingWebhook)
return result, err
}
func (s *TimerLayerWebhookStore) UpdateIncomingLastUsed(webhookID string, lastUsed int64) error {
start := time.Now()
err := s.WebhookStore.UpdateIncomingLastUsed(webhookID, lastUsed)
elapsed := float64(time.Since(start)) / float64(time.Second)
if s.Root.Metrics != nil {
success := "false"
if err == nil {
success = "true"
}
s.Root.Metrics.ObserveStoreMethodDuration("WebhookStore.UpdateIncomingLastUsed", success, elapsed)
}
return err
}
func (s *TimerLayerWebhookStore) UpdateOutgoing(hook *model.OutgoingWebhook) (*model.OutgoingWebhook, error) {
start := time.Now()
@ -14897,10 +14971,6 @@ func (s *TimerLayer) TotalSearchDbConnections() int {
return s.Store.TotalSearchDbConnections()
}
func (s *TimerLayer) GetDiagnostics(ctx context.Context) (*store.DatabaseDiagnostics, error) {
return s.Store.GetDiagnostics(ctx)
}
func (s *TimerLayer) UnlockFromMaster() {
s.Store.UnlockFromMaster()
}
@ -14949,6 +15019,7 @@ func New(childStore store.Store, metrics einterfaces.MetricsInterface) *TimerLay
newStore.PropertyValueStore = &TimerLayerPropertyValueStore{PropertyValueStore: childStore.PropertyValue(), Root: &newStore}
newStore.ReactionStore = &TimerLayerReactionStore{ReactionStore: childStore.Reaction(), Root: &newStore}
newStore.ReadReceiptStore = &TimerLayerReadReceiptStore{ReadReceiptStore: childStore.ReadReceipt(), Root: &newStore}
newStore.ReadTrackingStore = &TimerLayerReadTrackingStore{ReadTrackingStore: childStore.ReadTracking(), Root: &newStore}
newStore.RecapStore = &TimerLayerRecapStore{RecapStore: childStore.Recap(), Root: &newStore}
newStore.RemoteClusterStore = &TimerLayerRemoteClusterStore{RemoteClusterStore: childStore.RemoteCluster(), Root: &newStore}
newStore.RetentionPolicyStore = &TimerLayerRetentionPolicyStore{RetentionPolicyStore: childStore.RetentionPolicy(), Root: &newStore}

View file

@ -6,6 +6,13 @@ services:
extends:
file: build/docker-compose.common.yml
service: postgres
postgres-readtracking:
container_name: mattermost-postgres-readtracking
ports:
- "5433:5432"
extends:
file: build/docker-compose.common.yml
service: postgres-readtracking
minio:
container_name: mattermost-minio
ports:
@ -99,6 +106,7 @@ services:
- mm-test
depends_on:
- postgres
- postgres-readtracking
- minio
- inbucket
- openldap
@ -108,7 +116,7 @@ services:
- grafana
- loki
- otel-collector
command: postgres:5432 minio:9000 inbucket:9001 openldap:389 elasticsearch:9200 opensearch:9201 prometheus:9090 grafana:3000 loki:3100 otel-collector:13133
command: postgres:5432 postgres-readtracking:5432 minio:9000 inbucket:9001 openldap:389 elasticsearch:9200 opensearch:9201 prometheus:9090 grafana:3000 loki:3100 otel-collector:13133
leader:
build:

View file

@ -1577,6 +1577,72 @@ func (s *SqlSettings) SetDefaults(isUpdate bool) {
}
}
// ReadTrackingSettings configures the independent PostgreSQL pool used by
// the read-tracking sub-store. The pool, credentials, and migration set are
// all separate from SqlSettings; a failure in this DB does not affect the
// main DB. Disabled by default — the sub-store is a no-op when Enable=false.
type ReadTrackingSettings struct {
Enable *bool `access:"environment_database,write_restrictable,cloud_restrictable"`
DataSource *string `access:"environment_database,write_restrictable,cloud_restrictable"` // telemetry: none
MaxIdleConns *int `access:"environment_database,write_restrictable,cloud_restrictable"`
MaxOpenConns *int `access:"environment_database,write_restrictable,cloud_restrictable"`
ConnMaxLifetimeMilliseconds *int `access:"environment_database,write_restrictable,cloud_restrictable"`
ConnMaxIdleTimeMilliseconds *int `access:"environment_database,write_restrictable,cloud_restrictable"`
QueryTimeout *int `access:"environment_database,write_restrictable,cloud_restrictable"`
}
const ReadTrackingSettingsDefaultDataSource = "postgres://mmuser:mostest@localhost:5433/mattermost_readtracking?sslmode=disable&connect_timeout=10"
func (s *ReadTrackingSettings) SetDefaults() {
if s.Enable == nil {
s.Enable = new(false)
}
if s.DataSource == nil {
s.DataSource = new(ReadTrackingSettingsDefaultDataSource)
}
if s.MaxIdleConns == nil {
s.MaxIdleConns = new(50)
}
if s.MaxOpenConns == nil {
// Higher than main pool: write-heavy workload, more concurrent inserts.
s.MaxOpenConns = new(200)
}
if s.ConnMaxLifetimeMilliseconds == nil {
s.ConnMaxLifetimeMilliseconds = new(3600000)
}
if s.ConnMaxIdleTimeMilliseconds == nil {
s.ConnMaxIdleTimeMilliseconds = new(300000)
}
if s.QueryTimeout == nil {
s.QueryTimeout = new(30)
}
}
func (s *ReadTrackingSettings) isValid() *AppError {
if !*s.Enable {
return nil
}
if *s.DataSource == "" {
return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_data_src.app_error", nil, "", http.StatusBadRequest)
}
if *s.MaxIdleConns <= 0 {
return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_idle.app_error", nil, "", http.StatusBadRequest)
}
if *s.MaxOpenConns <= 0 {
return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_max_conn.app_error", nil, "", http.StatusBadRequest)
}
if *s.ConnMaxLifetimeMilliseconds < 0 {
return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_conn_max_lifetime_milliseconds.app_error", nil, "", http.StatusBadRequest)
}
if *s.ConnMaxIdleTimeMilliseconds < 0 {
return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_conn_max_idle_time_milliseconds.app_error", nil, "", http.StatusBadRequest)
}
if *s.QueryTimeout <= 0 {
return NewAppError("Config.IsValid", "model.config.is_valid.read_tracking_query_timeout.app_error", nil, "", http.StatusBadRequest)
}
return nil
}
type LogSettings struct {
EnableConsole *bool `access:"environment_logging,write_restrictable,cloud_restrictable"`
ConsoleLevel *string `access:"environment_logging,write_restrictable,cloud_restrictable"`
@ -4107,6 +4173,7 @@ type Config struct {
TeamSettings TeamSettings
ClientRequirements ClientRequirements
SqlSettings SqlSettings
ReadTrackingSettings ReadTrackingSettings
LogSettings LogSettings
ExperimentalAuditSettings ExperimentalAuditSettings
PasswordSettings PasswordSettings
@ -4223,6 +4290,7 @@ func (o *Config) SetDefaults() {
}
o.SqlSettings.SetDefaults(isUpdate)
o.ReadTrackingSettings.SetDefaults()
o.FileSettings.SetDefaults(isUpdate)
o.EmailSettings.SetDefaults(isUpdate)
o.PrivacySettings.setDefaults()
@ -4305,6 +4373,10 @@ func (o *Config) IsValid() *AppError {
return appErr
}
if appErr := o.ReadTrackingSettings.isValid(); appErr != nil {
return appErr
}
if appErr := o.FileSettings.isValid(); appErr != nil {
return appErr
}
@ -5259,6 +5331,10 @@ func (o *Config) Sanitize(pluginManifests []*Manifest, opts *SanitizeOptions) {
*o.SqlSettings.AtRestEncryptKey = FakeSetting
}
if o.ReadTrackingSettings.DataSource != nil {
*o.ReadTrackingSettings.DataSource = sanitizeDataSourceField(*o.ReadTrackingSettings.DataSource, "ReadTrackingSettings.DataSource")
}
if o.ElasticsearchSettings.Password != nil {
*o.ElasticsearchSettings.Password = FakeSetting
}

View file

@ -0,0 +1,10 @@
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package model
type UserPostRead struct {
UserID string `json:"user_id"`
PostID string `json:"post_id"`
CreatedAt int64 `json:"created_at"`
}

View file

@ -24,13 +24,30 @@ const (
// SetupConnection sets up the connection to the database and pings it to make sure it's alive.
// It also applies any database configuration settings that are required.
func SetupConnection(logger mlog.LoggerIFace, connType string, dataSource string, settings *model.SqlSettings, attempts int) (*dbsql.DB, error) {
db, err := dbsql.Open(*settings.DriverName, dataSource)
return setupConnection(logger, connType, dataSource, *settings.DriverName,
*settings.MaxIdleConns, *settings.MaxOpenConns,
*settings.ConnMaxLifetimeMilliseconds, *settings.ConnMaxIdleTimeMilliseconds,
attempts)
}
// SetupReadTrackingConnection opens a connection for the independent
// read-tracking pool. Mirrors SetupConnection but reads pool tuning from
// ReadTrackingSettings (driver is fixed to postgres for this DB).
func SetupReadTrackingConnection(logger mlog.LoggerIFace, connType string, settings *model.ReadTrackingSettings, attempts int) (*dbsql.DB, error) {
return setupConnection(logger, connType, *settings.DataSource, model.DatabaseDriverPostgres,
*settings.MaxIdleConns, *settings.MaxOpenConns,
*settings.ConnMaxLifetimeMilliseconds, *settings.ConnMaxIdleTimeMilliseconds,
attempts)
}
func setupConnection(logger mlog.LoggerIFace, connType, dataSource, driverName string, maxIdle, maxOpen, lifetimeMs, idleMs, attempts int) (*dbsql.DB, error) {
db, err := dbsql.Open(driverName, dataSource)
if err != nil {
return nil, errors.Wrap(err, "failed to open SQL connection")
}
// At this point, we have passed sql.Open, so we deliberately ignore any errors.
sanitized, _ := model.SanitizeDataSource(*settings.DriverName, dataSource)
sanitized, _ := model.SanitizeDataSource(driverName, dataSource)
logger = logger.With(
mlog.String("database", connType),
@ -66,11 +83,11 @@ func SetupConnection(logger mlog.LoggerIFace, connType string, dataSource string
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
} else {
db.SetMaxIdleConns(*settings.MaxIdleConns)
db.SetMaxOpenConns(*settings.MaxOpenConns)
db.SetMaxIdleConns(maxIdle)
db.SetMaxOpenConns(maxOpen)
}
db.SetConnMaxLifetime(time.Duration(*settings.ConnMaxLifetimeMilliseconds) * time.Millisecond)
db.SetConnMaxIdleTime(time.Duration(*settings.ConnMaxIdleTimeMilliseconds) * time.Millisecond)
db.SetConnMaxLifetime(time.Duration(lifetimeMs) * time.Millisecond)
db.SetConnMaxIdleTime(time.Duration(idleMs) * time.Millisecond)
return db, nil
}