mirror of
https://github.com/mattermost/mattermost.git
synced 2026-05-28 04:35:04 -04:00
* Add structured outputs, response sanitization, and session context for recaps
- Wrap BridgeClient to strip markdown code fencing from LLM JSON responses,
using explicit delegation to prevent unsanitized methods from leaking
- Add JSONOutputFormat schema to SummarizePosts for structured LLM output
- Pass user session in recap worker context for session-dependent code paths
- Pre-parse min plugin version semver at package level to avoid repeated parsing
- Hoist static JSON schema to package-level var to avoid per-call allocation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* Fix stripMarkdownCodeFencing to handle single-line fenced payloads
Address CodeRabbit feedback: the function previously returned the original
string when fenced JSON had no newline (e.g. ```json {"a":1}```), which
would break downstream JSON parsing.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* Handle case/spacing variants for single-line fenced language tags
Address CodeRabbit feedback: use case-insensitive comparison for the
"json" language tag and check for whitespace separator, so inputs like
```JSON {"a":1}``` are handled correctly.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* Revert BridgeClient wrapper and keep only structured output changes
Remove the BridgeClient wrapper, stripMarkdownCodeFencing, and semver
pre-parse from agents.go. The scope of this PR is limited to adding
JSONOutputFormat structured outputs for recaps and the worker session
context fix.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
* Fix lint: use any instead of interface{} and fix gofmt formatting
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
131 lines
4.5 KiB
Go
131 lines
4.5 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package recap
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
"github.com/mattermost/mattermost/server/public/shared/request"
|
|
"github.com/mattermost/mattermost/server/v8/channels/jobs"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store"
|
|
)
|
|
|
|
type AppIface interface {
|
|
ProcessRecapChannel(rctx request.CTX, recapID, channelID, userID, agentID string) (*model.RecapChannelResult, *model.AppError)
|
|
Publish(message *model.WebSocketEvent)
|
|
}
|
|
|
|
func MakeWorker(jobServer *jobs.JobServer, storeInstance store.Store, appInstance AppIface) *jobs.SimpleWorker {
|
|
isEnabled := func(cfg *model.Config) bool {
|
|
return cfg.FeatureFlags.EnableAIRecaps
|
|
}
|
|
|
|
execute := func(logger mlog.LoggerIFace, job *model.Job) error {
|
|
defer jobServer.HandleJobPanic(logger, job)
|
|
return processRecapJob(logger, job, storeInstance, appInstance, func(progress int64) {
|
|
_ = jobServer.SetJobProgress(job, progress)
|
|
})
|
|
}
|
|
|
|
return jobs.NewSimpleWorker("Recap", jobServer, execute, isEnabled)
|
|
}
|
|
|
|
func processRecapJob(logger mlog.LoggerIFace, job *model.Job, storeInstance store.Store, appInstance AppIface, setProgress func(int64)) error {
|
|
recapID := job.Data["recap_id"]
|
|
userID := job.Data["user_id"]
|
|
channelIDs := strings.Split(job.Data["channel_ids"], ",")
|
|
agentID := job.Data["agent_id"]
|
|
|
|
logger.Info("Starting recap job",
|
|
mlog.String("recap_id", recapID),
|
|
mlog.String("agent_id", agentID),
|
|
mlog.Int("channel_count", len(channelIDs)))
|
|
|
|
// Update status to processing
|
|
_ = storeInstance.Recap().UpdateRecapStatus(recapID, model.RecapStatusProcessing)
|
|
publishRecapUpdate(appInstance, recapID, userID)
|
|
|
|
totalMessages := 0
|
|
successfulChannels := []string{}
|
|
failedChannels := []string{}
|
|
|
|
for i, channelID := range channelIDs {
|
|
// Update progress
|
|
progress := int64((i * 100) / len(channelIDs))
|
|
if setProgress != nil {
|
|
setProgress(progress)
|
|
}
|
|
|
|
// Process the channel - use a context with the user's session so that
|
|
// session-dependent code (e.g. auto-translation supplements) works correctly.
|
|
rctx := request.EmptyContext(logger).WithSession(&model.Session{UserId: userID})
|
|
result, err := appInstance.ProcessRecapChannel(rctx, recapID, channelID, userID, agentID)
|
|
if err != nil {
|
|
logger.Warn("Failed to process channel",
|
|
mlog.String("channel_id", channelID),
|
|
mlog.Err(err))
|
|
failedChannels = append(failedChannels, channelID)
|
|
continue
|
|
}
|
|
|
|
if !result.Success {
|
|
logger.Warn("Channel processing unsuccessful", mlog.String("channel_id", channelID))
|
|
failedChannels = append(failedChannels, channelID)
|
|
continue
|
|
}
|
|
|
|
totalMessages += result.MessageCount
|
|
successfulChannels = append(successfulChannels, channelID)
|
|
}
|
|
|
|
// Update recap with final data (title is already set by user in CreateRecap)
|
|
recap, _ := storeInstance.Recap().GetRecap(recapID)
|
|
recap.TotalMessageCount = totalMessages
|
|
recap.UpdateAt = model.GetMillis()
|
|
|
|
if len(failedChannels) > 0 && len(successfulChannels) == 0 {
|
|
recap.Status = model.RecapStatusFailed
|
|
_, err := storeInstance.Recap().UpdateRecap(recap)
|
|
if err != nil {
|
|
logger.Error("Failed to update recap", mlog.Err(err))
|
|
return fmt.Errorf("failed to update recap: %w", err)
|
|
}
|
|
publishRecapUpdate(appInstance, recapID, userID)
|
|
return fmt.Errorf("all channels failed to process")
|
|
} else if len(failedChannels) > 0 {
|
|
recap.Status = model.RecapStatusCompleted
|
|
_, err := storeInstance.Recap().UpdateRecap(recap)
|
|
if err != nil {
|
|
logger.Error("Failed to update recap", mlog.Err(err))
|
|
return fmt.Errorf("failed to update recap: %w", err)
|
|
}
|
|
publishRecapUpdate(appInstance, recapID, userID)
|
|
logger.Warn("Some channels failed", mlog.Int("failed_count", len(failedChannels)))
|
|
// Job succeeds with warning
|
|
} else {
|
|
recap.Status = model.RecapStatusCompleted
|
|
_, err := storeInstance.Recap().UpdateRecap(recap)
|
|
if err != nil {
|
|
logger.Error("Failed to update recap", mlog.Err(err))
|
|
return fmt.Errorf("failed to update recap: %w", err)
|
|
}
|
|
publishRecapUpdate(appInstance, recapID, userID)
|
|
}
|
|
|
|
logger.Info("Recap job completed",
|
|
mlog.String("recap_id", recapID),
|
|
mlog.Int("successful_channels", len(successfulChannels)),
|
|
mlog.Int("failed_channels", len(failedChannels)))
|
|
|
|
return nil
|
|
}
|
|
|
|
func publishRecapUpdate(appInstance AppIface, recapID, userID string) {
|
|
message := model.NewWebSocketEvent(model.WebsocketEventRecapUpdated, "", "", userID, nil, "")
|
|
message.Add("recap_id", recapID)
|
|
appInstance.Publish(message)
|
|
}
|