mattermost/server/channels/jobs/recap/worker.go
Nick Misasi cbdf5536fd
Add structured JSON output for recap summarization (#35496)
* Add structured outputs, response sanitization, and session context for recaps

- Wrap BridgeClient to strip markdown code fencing from LLM JSON responses,
  using explicit delegation to prevent unsanitized methods from leaking
- Add JSONOutputFormat schema to SummarizePosts for structured LLM output
- Pass user session in recap worker context for session-dependent code paths
- Pre-parse min plugin version semver at package level to avoid repeated parsing
- Hoist static JSON schema to package-level var to avoid per-call allocation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Fix stripMarkdownCodeFencing to handle single-line fenced payloads

Address CodeRabbit feedback: the function previously returned the original
string when fenced JSON had no newline (e.g. ```json {"a":1}```), which
would break downstream JSON parsing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Handle case/spacing variants for single-line fenced language tags

Address CodeRabbit feedback: use case-insensitive comparison for the
"json" language tag and check for whitespace separator, so inputs like
```JSON {"a":1}``` are handled correctly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Revert BridgeClient wrapper and keep only structured output changes

Remove the BridgeClient wrapper, stripMarkdownCodeFencing, and semver
pre-parse from agents.go. The scope of this PR is limited to adding
JSONOutputFormat structured outputs for recaps and the worker session
context fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Fix lint: use any instead of interface{} and fix gofmt formatting

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-05 19:06:05 +00:00

131 lines
4.5 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package recap
import (
"fmt"
"strings"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/public/shared/request"
"github.com/mattermost/mattermost/server/v8/channels/jobs"
"github.com/mattermost/mattermost/server/v8/channels/store"
)
type AppIface interface {
ProcessRecapChannel(rctx request.CTX, recapID, channelID, userID, agentID string) (*model.RecapChannelResult, *model.AppError)
Publish(message *model.WebSocketEvent)
}
func MakeWorker(jobServer *jobs.JobServer, storeInstance store.Store, appInstance AppIface) *jobs.SimpleWorker {
isEnabled := func(cfg *model.Config) bool {
return cfg.FeatureFlags.EnableAIRecaps
}
execute := func(logger mlog.LoggerIFace, job *model.Job) error {
defer jobServer.HandleJobPanic(logger, job)
return processRecapJob(logger, job, storeInstance, appInstance, func(progress int64) {
_ = jobServer.SetJobProgress(job, progress)
})
}
return jobs.NewSimpleWorker("Recap", jobServer, execute, isEnabled)
}
func processRecapJob(logger mlog.LoggerIFace, job *model.Job, storeInstance store.Store, appInstance AppIface, setProgress func(int64)) error {
recapID := job.Data["recap_id"]
userID := job.Data["user_id"]
channelIDs := strings.Split(job.Data["channel_ids"], ",")
agentID := job.Data["agent_id"]
logger.Info("Starting recap job",
mlog.String("recap_id", recapID),
mlog.String("agent_id", agentID),
mlog.Int("channel_count", len(channelIDs)))
// Update status to processing
_ = storeInstance.Recap().UpdateRecapStatus(recapID, model.RecapStatusProcessing)
publishRecapUpdate(appInstance, recapID, userID)
totalMessages := 0
successfulChannels := []string{}
failedChannels := []string{}
for i, channelID := range channelIDs {
// Update progress
progress := int64((i * 100) / len(channelIDs))
if setProgress != nil {
setProgress(progress)
}
// Process the channel - use a context with the user's session so that
// session-dependent code (e.g. auto-translation supplements) works correctly.
rctx := request.EmptyContext(logger).WithSession(&model.Session{UserId: userID})
result, err := appInstance.ProcessRecapChannel(rctx, recapID, channelID, userID, agentID)
if err != nil {
logger.Warn("Failed to process channel",
mlog.String("channel_id", channelID),
mlog.Err(err))
failedChannels = append(failedChannels, channelID)
continue
}
if !result.Success {
logger.Warn("Channel processing unsuccessful", mlog.String("channel_id", channelID))
failedChannels = append(failedChannels, channelID)
continue
}
totalMessages += result.MessageCount
successfulChannels = append(successfulChannels, channelID)
}
// Update recap with final data (title is already set by user in CreateRecap)
recap, _ := storeInstance.Recap().GetRecap(recapID)
recap.TotalMessageCount = totalMessages
recap.UpdateAt = model.GetMillis()
if len(failedChannels) > 0 && len(successfulChannels) == 0 {
recap.Status = model.RecapStatusFailed
_, err := storeInstance.Recap().UpdateRecap(recap)
if err != nil {
logger.Error("Failed to update recap", mlog.Err(err))
return fmt.Errorf("failed to update recap: %w", err)
}
publishRecapUpdate(appInstance, recapID, userID)
return fmt.Errorf("all channels failed to process")
} else if len(failedChannels) > 0 {
recap.Status = model.RecapStatusCompleted
_, err := storeInstance.Recap().UpdateRecap(recap)
if err != nil {
logger.Error("Failed to update recap", mlog.Err(err))
return fmt.Errorf("failed to update recap: %w", err)
}
publishRecapUpdate(appInstance, recapID, userID)
logger.Warn("Some channels failed", mlog.Int("failed_count", len(failedChannels)))
// Job succeeds with warning
} else {
recap.Status = model.RecapStatusCompleted
_, err := storeInstance.Recap().UpdateRecap(recap)
if err != nil {
logger.Error("Failed to update recap", mlog.Err(err))
return fmt.Errorf("failed to update recap: %w", err)
}
publishRecapUpdate(appInstance, recapID, userID)
}
logger.Info("Recap job completed",
mlog.String("recap_id", recapID),
mlog.Int("successful_channels", len(successfulChannels)),
mlog.Int("failed_channels", len(failedChannels)))
return nil
}
func publishRecapUpdate(appInstance AppIface, recapID, userID string) {
message := model.NewWebSocketEvent(model.WebsocketEventRecapUpdated, "", "", userID, nil, "")
message.Add("recap_id", recapID)
appInstance.Publish(message)
}