Merge pull request #253 from Icinga/feature/migrate-history

cmd/ido2icingadb: migrate history from IDO to Icinga DB
This commit is contained in:
Julian Brost 2022-11-02 12:16:47 +01:00 committed by GitHub
commit 9fb9a10a8b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 2240 additions and 40 deletions

View file

@ -5,7 +5,7 @@ set -eo pipefail
find_license_file() {
MOD_NAME="$1"
LICENSE_DIR="vendor/$MOD_NAME"
LICENSE_FILES=({,../}LICENSE{,.txt,.md})
LICENSE_FILES=({,../}{,UN}LICENSE{,.txt,.md})
for LICENSE_FILE in "${LICENSE_FILES[@]}"; do
LICENSE_FILE="${LICENSE_DIR}/$LICENSE_FILE"
@ -29,6 +29,8 @@ list_all_deps() {
COMPATIBLE_LINE=$(($LINENO + 2))
COMPATIBLE=(
# public domain
3cee2c43614ad4572d9d594c81b9348cf45ed5ac # vendor/github.com/vbauerster/mpb/v6/UNLICENSE
# MIT
66d504eb2f162b9cbf11b07506eeed90c6edabe1 # vendor/github.com/cespare/xxhash/v2/LICENSE.txt
1513ff663e946fdcadb630bed670d253b8b22e1e # vendor/github.com/davecgh/go-spew/spew/../LICENSE

View file

@ -0,0 +1,298 @@
package main
import (
"database/sql"
_ "embed"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"math"
"strings"
"time"
)
//go:embed embed/event_time_cache_schema.sql
var eventTimeCacheSchema string
//go:embed embed/previous_hard_state_cache_schema.sql
var previousHardStateCacheSchema string
// buildEventTimeCache rationale:
//
// Icinga DB's flapping_history#id always needs start_time. flapping_end rows would need an IDO subquery for that.
// That would make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_flappinghistory once, compute flapping_history#start_time
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for acknowledgements. (On non-recoverable errors the whole program exits.)
func buildEventTimeCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
EventTime int64
EventTimeUsec uint32
EventIsStart uint8
ObjectId uint64
}
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var checkpoint struct {
Cnt int64
MaxId sql.NullInt64
}
cacheGet(*tx, &checkpoint, "SELECT COUNT(*) cnt, MAX(history_id) max_id FROM end_start_time")
ht.bar.SetCurrent(checkpoint.Cnt * 2)
// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" > :checkpoint ORDER BY xh."+ht.idoIdColumn+" LIMIT :bulk",
nil, checkpoint.MaxId.Int64, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
if idoRow.EventIsStart == 0 {
// Ack/flapping end event. Get the start event time:
var lst []struct {
EventTime int64
EventTimeUsec uint32
}
cacheSelect(
*tx, &lst, "SELECT event_time, event_time_usec FROM last_start_time WHERE object_id=?",
idoRow.ObjectId,
)
// If we have that, ...
if len(lst) > 0 {
// ... save the start event time for the actual migration:
cacheExec(
*tx,
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.Id, lst[0].EventTime, lst[0].EventTimeUsec,
)
// This previously queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
}
} else {
// Ack/flapping start event directly after another start event (per checkable).
// The old one won't have (but the new one will) an end event (which will need its time).
cacheExec(*tx, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId)
// An ack/flapping start event. The following end event (per checkable) will need its time.
cacheExec(
*tx, "INSERT INTO last_start_time(object_id, event_time, event_time_usec) VALUES (?, ?, ?)",
idoRow.ObjectId, idoRow.EventTime, idoRow.EventTimeUsec,
)
}
commitPeriodically()
checkpoint = idoRow.Id
}
ht.bar.IncrBy(len(idoRows))
return
},
)
// This never queried info isn't needed anymore.
cacheExec(*tx, "DELETE FROM last_start_time")
})
ht.bar.SetTotal(ht.bar.Current(), true)
}
// buildPreviousHardStateCache rationale:
//
// Icinga DB's state_history#previous_hard_state would need a subquery.
// That make the IDO reading even slower than the Icinga DB writing.
// Therefore: Stream IDO's icinga_statehistory once, compute state_history#previous_hard_state
// and cache it into an SQLite database. Then steam from that database and the IDO.
//
// Similar for notifications. (On non-recoverable errors the whole program exits.)
func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
type row = struct {
Id uint64
ObjectId uint64
LastHardState uint8
}
chunkCacheTx(ht.cache, func(tx **sqlx.Tx, commitPeriodically func()) {
var nextIds struct {
Cnt int64
MinId sql.NullInt64
}
cacheGet(*tx, &nextIds, "SELECT COUNT(*) cnt, MIN(history_id) min_id FROM next_ids")
var previousHardStateCnt int64
cacheGet(*tx, &previousHardStateCnt, "SELECT COUNT(*) FROM previous_hard_state")
var checkpoint int64
if nextIds.MinId.Valid { // there are next_ids
checkpoint = nextIds.MinId.Int64 // this kind of caches is filled descending
} else { // there aren't any next_ids
// next_ids contains the most recently processed IDs and is only empty if...
if previousHardStateCnt == 0 {
// ... we didn't actually start yet...
checkpoint = math.MaxInt64 // start from the largest (possible) ID
} else {
// ... or we've already finished.
checkpoint = 0 // make following query no-op
}
}
ht.bar.SetCurrent(previousHardStateCnt + nextIds.Cnt)
// We continue where we finished before. As we build the cache in reverse chronological order:
// 1. If the history grows between two migration trials, we won't migrate the difference. Workarounds:
// a. Start migration after Icinga DB is up and running.
// b. Remove the cache before the next migration trial.
// 2. If the history gets cleaned up between two migration trials,
// the difference either just doesn't appear in the cache or - if already there - will be ignored later.
// Stream source data...
sliceIdoHistory(
ht,
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" <= :toid AND xh."+
ht.idoIdColumn+" < :checkpoint ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT :bulk",
nil, checkpoint, // ... since we were interrupted:
func(idoRows []row) (checkpoint interface{}) {
for _, idoRow := range idoRows {
var nhs []struct{ NextHardState uint8 }
cacheSelect(*tx, &nhs, "SELECT next_hard_state FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
if len(nhs) < 1 { // we just started (per checkable)
// At the moment (we're "travelling back in time") that's the checkable's hard state:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)
// But for the current time point the previous hard state isn't known, yet:
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else if idoRow.LastHardState == nhs[0].NextHardState {
// The hard state didn't change yet (per checkable),
// so this time point also awaits the previous hard state.
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
} else { // the hard state changed (per checkable)
// That past hard state is now available for the processed future time points:
cacheExec(
*tx,
"INSERT INTO previous_hard_state(history_id, previous_hard_state) "+
"SELECT history_id, ? FROM next_ids WHERE object_id=?",
idoRow.LastHardState, idoRow.ObjectId,
)
// Now they have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state WHERE object_id=?", idoRow.ObjectId)
cacheExec(*tx, "DELETE FROM next_ids WHERE object_id=?", idoRow.ObjectId)
// That's done.
// Now do the same thing as in the "we just started" case above, for the same reason:
cacheExec(
*tx, "INSERT INTO next_hard_state(object_id, next_hard_state) VALUES (?, ?)",
idoRow.ObjectId, idoRow.LastHardState,
)
cacheExec(
*tx, "INSERT INTO next_ids(history_id, object_id) VALUES (?, ?)",
idoRow.Id, idoRow.ObjectId,
)
}
commitPeriodically()
checkpoint = idoRow.Id
}
ht.bar.IncrBy(len(idoRows))
return
},
)
// No past hard state is available for the processed future time points, assuming pending:
cacheExec(
*tx, "INSERT INTO previous_hard_state(history_id, previous_hard_state) SELECT history_id, 99 FROM next_ids",
)
// Now they should have what they wanted:
cacheExec(*tx, "DELETE FROM next_hard_state")
cacheExec(*tx, "DELETE FROM next_ids")
})
ht.bar.SetTotal(ht.bar.Current(), true)
}
// chunkCacheTx rationale: during do operate on cache via *tx. After every completed operation call commitPeriodically()
// which periodically commits *tx and starts a new tx. (That's why tx is a **, not just a *.)
// (On non-recoverable errors the whole program exits.)
func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, commitPeriodically func())) {
logger := log.With("backend", "cache")
tx, err := cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
const commitInterval = 5 * time.Minute
nextCommit := time.Now().Add(commitInterval)
do(&tx, func() { // commitPeriodically
if now := time.Now(); now.After(nextCommit) {
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
var err error
tx, err = cache.Beginx()
if err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
nextCommit = nextCommit.Add(commitInterval)
}
})
if err := tx.Commit(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
}
// cacheGet does cache.Get(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheGet(cache interface {
Get(dest interface{}, query string, args ...interface{}) error
}, dest interface{}, query string, args ...interface{}) {
if err := cache.Get(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
// cacheSelect does cacheTx.Select(dest, query, args...). (On non-recoverable errors the whole program exits.)
func cacheSelect(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
if err := cacheTx.Select(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
// cacheExec does cacheTx.Exec(dml, args...). On non-recoverable errors the whole program exits.
func cacheExec(cacheTx *sqlx.Tx, dml string, args ...interface{}) {
if _, err := cacheTx.Exec(dml, args...); err != nil {
log.With("backend", "cache", "dml", dml, "args", args).Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}

View file

@ -0,0 +1,843 @@
package main
import (
"database/sql"
_ "embed"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingadb/v1/history"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"strconv"
"strings"
"time"
)
//go:embed embed/comment_query.sql
var commentMigrationQuery string
//go:embed embed/downtime_query.sql
var downtimeMigrationQuery string
//go:embed embed/flapping_query.sql
var flappingMigrationQuery string
//go:embed embed/notification_query.sql
var notificationMigrationQuery string
//go:embed embed/state_query.sql
var stateMigrationQuery string
type commentRow = struct {
CommenthistoryId uint64
EntryTime int64
EntryTimeUsec uint32
EntryType uint8
AuthorName string
CommentData string
IsPersistent uint8
ExpirationTime int64
DeletionTime int64
DeletionTimeUsec uint32
Name string
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertCommentRows(
env string, envId, endpointId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity
for _, row := range idoRows {
checkpoint = row.CommenthistoryId
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
switch row.EntryType {
case 1: // user
id := calcObjectId(env, row.Name)
entryTime := convertTime(row.EntryTime, row.EntryTimeUsec)
removeTime := convertTime(row.DeletionTime, row.DeletionTimeUsec)
expireTime := convertTime(row.ExpirationTime, 0)
commentHistory = append(commentHistory, &history.CommentHistory{
CommentHistoryEntity: history.CommentHistoryEntity{CommentId: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
CommentHistoryUpserter: history.CommentHistoryUpserter{
RemoveTime: removeTime,
HasBeenRemoved: icingadbTypes.Bool{Bool: !removeTime.Time().IsZero(), Valid: true},
},
EntryTime: entryTime,
Author: row.AuthorName,
Comment: row.CommentData,
EntryType: icingadbTypes.CommentType(row.EntryType),
IsPersistent: icingadbTypes.Bool{Bool: row.IsPersistent != 0, Valid: true},
IsSticky: icingadbTypes.Bool{Bool: false, Valid: true},
ExpireTime: expireTime,
})
h1 := &history.HistoryComment{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_add", row.Name})},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "comment_add",
},
CommentHistoryId: id,
EntryTime: entryTime,
}
h1.EventTime.History = h1
allHistoryComment = append(allHistoryComment, h1)
if !removeTime.Time().IsZero() { // remove
h2 := &history.HistoryComment{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "comment_remove", row.Name})},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "comment_remove",
},
CommentHistoryId: id,
EntryTime: entryTime,
RemoveTime: removeTime,
ExpireTime: expireTime,
}
h2.EventTime.History = h2
allHistoryComment = append(allHistoryComment, h2)
}
case 4: // ack
name := row.Name1
if row.Name2 != "" {
name += "!" + row.Name2
}
setTime := convertTime(row.EntryTime, row.EntryTimeUsec)
setTs := float64(setTime.Time().UnixMilli())
clearTime := convertTime(row.DeletionTime, row.DeletionTimeUsec)
acknowledgementHistoryId := hashAny([]any{env, name, setTs})
acknowledgementHistory = append(acknowledgementHistory, &history.AcknowledgementHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: acknowledgementHistoryId},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
AckHistoryUpserter: history.AckHistoryUpserter{ClearTime: clearTime},
SetTime: setTime,
Author: icingadbTypes.String{
NullString: sql.NullString{
String: row.AuthorName,
Valid: true,
},
},
Comment: icingadbTypes.String{
NullString: sql.NullString{
String: row.CommentData,
Valid: true,
},
},
ExpireTime: convertTime(row.ExpirationTime, 0),
IsPersistent: icingadbTypes.Bool{
Bool: row.IsPersistent != 0,
Valid: true,
},
})
h1 := &history.HistoryAck{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{
Id: hashAny([]any{env, "ack_set", name, setTs}),
},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "ack_set",
},
AcknowledgementHistoryId: acknowledgementHistoryId,
SetTime: setTime,
ClearTime: clearTime,
}
h1.EventTime.History = h1
allHistoryAck = append(allHistoryAck, h1)
if !clearTime.Time().IsZero() {
h2 := &history.HistoryAck{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{
Id: hashAny([]any{env, "ack_clear", name, setTs}),
},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "ack_clear",
},
AcknowledgementHistoryId: acknowledgementHistoryId,
SetTime: setTime,
ClearTime: clearTime,
}
h2.EventTime.History = h2
allHistoryAck = append(allHistoryAck, h2)
}
}
}
icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck}
return
}
type downtimeRow = struct {
DowntimehistoryId uint64
EntryTime int64
AuthorName string
CommentData string
IsFixed uint8
Duration int64
ScheduledStartTime int64
ScheduledEndTime int64
ActualStartTime int64
ActualStartTimeUsec uint32
ActualEndTime int64
ActualEndTimeUsec uint32
WasCancelled uint8
TriggerTime int64
Name string
ObjecttypeId uint8
Name1 string
Name2 string
TriggeredBy string
}
func convertDowntimeRows(
env string, envId, endpointId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
var downtimeHistory, allHistory, sla []contracts.Entity
for _, row := range idoRows {
checkpoint = row.DowntimehistoryId
id := calcObjectId(env, row.Name)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
scheduledStart := convertTime(row.ScheduledStartTime, 0)
scheduledEnd := convertTime(row.ScheduledEndTime, 0)
triggerTime := convertTime(row.TriggerTime, 0)
actualStart := convertTime(row.ActualStartTime, row.ActualStartTimeUsec)
actualEnd := convertTime(row.ActualEndTime, row.ActualEndTimeUsec)
var startTime, endTime, cancelTime icingadbTypes.UnixMilli
if scheduledEnd.Time().IsZero() {
scheduledEnd = icingadbTypes.UnixMilli(scheduledStart.Time().Add(time.Duration(row.Duration) * time.Second))
}
if actualStart.Time().IsZero() {
startTime = scheduledStart
} else {
startTime = actualStart
}
if actualEnd.Time().IsZero() {
endTime = scheduledEnd
} else {
endTime = actualEnd
}
if triggerTime.Time().IsZero() {
triggerTime = startTime
}
if row.WasCancelled != 0 {
cancelTime = actualEnd
}
downtimeHistory = append(downtimeHistory, &history.DowntimeHistory{
DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
DowntimeHistoryUpserter: history.DowntimeHistoryUpserter{
HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
CancelTime: cancelTime,
},
TriggeredById: calcObjectId(env, row.TriggeredBy),
EntryTime: convertTime(row.EntryTime, 0),
Author: row.AuthorName,
Comment: row.CommentData,
IsFlexible: icingadbTypes.Bool{Bool: row.IsFixed == 0, Valid: true},
FlexibleDuration: uint64(row.Duration) * 1000,
ScheduledStartTime: scheduledStart,
ScheduledEndTime: scheduledEnd,
StartTime: startTime,
EndTime: endTime,
TriggerTime: triggerTime,
})
h1 := &history.HistoryDowntime{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_start", row.Name})},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "downtime_start",
},
DowntimeHistoryId: id,
StartTime: startTime,
}
h1.EventTime.History = h1
allHistory = append(allHistory, h1)
if !actualEnd.Time().IsZero() { // remove
h2 := &history.HistoryDowntime{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: hashAny([]string{env, "downtime_end", row.Name})},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "downtime_end",
},
DowntimeHistoryId: id,
StartTime: startTime,
CancelTime: cancelTime,
EndTime: endTime,
HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
}
h2.EventTime.History = h2
allHistory = append(allHistory, h2)
}
s := &history.SlaHistoryDowntime{
DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
DowntimeStart: startTime,
HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
CancelTime: cancelTime,
EndTime: endTime,
}
s.DowntimeEnd.History = s
sla = append(sla, s)
}
icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla}
return
}
type flappingRow = struct {
FlappinghistoryId uint64
EventTime int64
EventTimeUsec uint32
EventType uint16
PercentStateChange sql.NullFloat64
LowThreshold float64
HighThreshold float64
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertFlappingRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow,
) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
EventTime int64
EventTimeUsec uint32
}
selectCache(
&cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?",
idoRows[0].FlappinghistoryId, idoRows[len(idoRows)-1].FlappinghistoryId,
)
// Needed for start time (see below).
cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec)
}
var flappingHistory, flappingHistoryUpserts, allHistory []contracts.Entity
for _, row := range idoRows {
checkpoint = row.FlappinghistoryId
ts := convertTime(row.EventTime, row.EventTimeUsec)
// Needed for ID (see below).
var start icingadbTypes.UnixMilli
if row.EventType == 1001 { // end
var ok bool
start, ok = cachedById[row.FlappinghistoryId]
if !ok {
continue
}
} else {
start = ts
}
name := row.Name1
if row.Name2 != "" {
name += "!" + row.Name2
}
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
startTime := float64(start.Time().UnixMilli())
flappingHistoryId := hashAny([]interface{}{env, name, startTime})
if row.EventType == 1001 { // end
// The start counterpart should already have been inserted.
flappingHistoryUpserts = append(flappingHistoryUpserts, &history.FlappingHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: flappingHistoryId},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
FlappingHistoryUpserter: history.FlappingHistoryUpserter{
EndTime: ts,
PercentStateChangeEnd: icingadbTypes.Float{NullFloat64: row.PercentStateChange},
FlappingThresholdLow: float32(row.LowThreshold),
FlappingThresholdHigh: float32(row.HighThreshold),
},
StartTime: start,
})
h := &history.HistoryFlapping{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{
Id: hashAny([]interface{}{env, "flapping_end", name, startTime}),
},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "flapping_end",
},
FlappingHistoryId: flappingHistoryId,
StartTime: start,
EndTime: ts,
}
h.EventTime.History = h
allHistory = append(allHistory, h)
} else {
flappingHistory = append(flappingHistory, &history.FlappingHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: flappingHistoryId},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
FlappingHistoryUpserter: history.FlappingHistoryUpserter{
FlappingThresholdLow: float32(row.LowThreshold),
FlappingThresholdHigh: float32(row.HighThreshold),
},
StartTime: start,
PercentStateChangeStart: icingadbTypes.Float{NullFloat64: row.PercentStateChange},
})
h := &history.HistoryFlapping{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{
Id: hashAny([]interface{}{env, "flapping_start", name, startTime}),
},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "flapping_start",
},
FlappingHistoryId: flappingHistoryId,
StartTime: start,
}
h.EventTime.History = h
allHistory = append(allHistory, h)
}
}
icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory}
icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts}
return
}
type notificationRow = struct {
NotificationId uint64
NotificationReason uint8
EndTime int64
EndTimeUsec uint32
State uint8
Output string
LongOutput sql.NullString
ContactsNotified uint16
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertNotificationRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
PreviousHardState uint8
}
selectCache(
&cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?",
idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId,
)
cachedById := make(map[uint64]uint8, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = c.PreviousHardState
}
var contacts []struct {
NotificationId uint64
Name1 string
}
{
var query = ido.Rebind(
"SELECT c.notification_id, o.name1 FROM icinga_contactnotifications c " +
"INNER JOIN icinga_objects o ON o.object_id=c.contact_object_id WHERE c.notification_id BETWEEN ? AND ?",
)
err := ido.Select(&contacts, query, idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId)
if err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
contactsById := map[uint64]map[string]struct{}{}
for _, contact := range contacts {
perId, ok := contactsById[contact.NotificationId]
if !ok {
perId = map[string]struct{}{}
contactsById[contact.NotificationId] = perId
}
perId[contact.Name1] = struct{}{}
}
var notificationHistory, userNotificationHistory, allHistory []contracts.Entity
for _, row := range idoRows {
checkpoint = row.NotificationId
previousHardState, ok := cachedById[row.NotificationId]
if !ok {
continue
}
// The IDO tracks only sent notifications, but not notification config objects, nor even their names.
// We have to improvise. By the way we avoid unwanted collisions between synced and migrated data via "ID"
// instead of "HOST[!SERVICE]!NOTIFICATION" (ok as this name won't be parsed, but only hashed) and between
// migrated data itself via the history ID as object name, i.e. one "virtual object" per sent notification.
name := strconv.FormatUint(row.NotificationId, 10)
nt := convertNotificationType(row.NotificationReason, row.State)
ntEnum, err := nt.Value()
if err != nil {
continue
}
ts := convertTime(row.EndTime, row.EndTimeUsec)
tsMilli := float64(ts.Time().UnixMilli())
notificationHistoryId := hashAny([]interface{}{env, name, ntEnum, tsMilli})
id := hashAny([]interface{}{env, "notification", name, ntEnum, tsMilli})
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
text := row.Output
if row.LongOutput.Valid {
text += "\n\n" + row.LongOutput.String
}
notificationHistory = append(notificationHistory, &history.NotificationHistory{
HistoryTableEntity: history.HistoryTableEntity{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: notificationHistoryId},
},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
NotificationId: calcObjectId(env, name),
Type: nt,
SendTime: ts,
State: row.State,
PreviousHardState: previousHardState,
Text: icingadbTypes.String{
NullString: sql.NullString{
String: text,
Valid: true,
},
},
UsersNotified: row.ContactsNotified,
})
allHistory = append(allHistory, &history.HistoryNotification{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "notification",
},
NotificationHistoryId: notificationHistoryId,
EventTime: ts,
})
for contact := range contactsById[row.NotificationId] {
userId := calcObjectId(env, contact)
userNotificationHistory = append(userNotificationHistory, &history.UserNotificationHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{
Id: utils.Checksum(append(append([]byte(nil), notificationHistoryId...), userId...)),
},
},
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: envId},
NotificationHistoryId: notificationHistoryId,
UserId: userId,
})
}
}
icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory}
return
}
// convertNotificationType maps IDO values[1] to Icinga DB ones[2].
//
// [1]: https://github.com/Icinga/icinga2/blob/32c7f7730db154ba0dff5856a8985d125791c/lib/db_ido/dbevents.cpp#L1507-L1524
// [2]: https://github.com/Icinga/icingadb/blob/8f31ac143875498797725adb9bfacf3d4/pkg/types/notification_type.go#L53-L61
func convertNotificationType(notificationReason, state uint8) icingadbTypes.NotificationType {
switch notificationReason {
case 0: // state
if state == 0 {
return 64 // recovery
} else {
return 32 // problem
}
case 1: // acknowledgement
return 16
case 2: // flapping start
return 128
case 3: // flapping end
return 256
case 5: // downtime start
return 1
case 6: // downtime end
return 2
case 7: // downtime removed
return 4
case 8: // custom
return 8
default: // bad notification type
return 0
}
}
type stateRow = struct {
StatehistoryId uint64
StateTime int64
StateTimeUsec uint32
State uint8
StateType uint8
CurrentCheckAttempt uint16
MaxCheckAttempts uint16
LastState uint8
LastHardState uint8
Output sql.NullString
LongOutput sql.NullString
CheckSource sql.NullString
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertStateRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow,
) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
PreviousHardState uint8
}
selectCache(
&cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?",
idoRows[0].StatehistoryId, idoRows[len(idoRows)-1].StatehistoryId,
)
cachedById := make(map[uint64]uint8, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = c.PreviousHardState
}
var stateHistory, allHistory, sla []contracts.Entity
for _, row := range idoRows {
checkpoint = row.StatehistoryId
previousHardState, ok := cachedById[row.StatehistoryId]
if !ok {
continue
}
name := strings.Join([]string{row.Name1, row.Name2}, "!")
ts := convertTime(row.StateTime, row.StateTimeUsec)
tsMilli := float64(ts.Time().UnixMilli())
stateHistoryId := hashAny([]interface{}{env, name, tsMilli})
id := hashAny([]interface{}{env, "state_change", name, tsMilli})
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
stateHistory = append(stateHistory, &history.StateHistory{
HistoryTableEntity: history.HistoryTableEntity{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: stateHistoryId},
},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
EventTime: ts,
StateType: icingadbTypes.StateType(row.StateType),
SoftState: row.State,
HardState: row.LastHardState,
PreviousSoftState: row.LastState,
PreviousHardState: previousHardState,
CheckAttempt: uint8(row.CurrentCheckAttempt),
Output: icingadbTypes.String{NullString: row.Output},
LongOutput: icingadbTypes.String{NullString: row.LongOutput},
MaxCheckAttempts: uint32(row.MaxCheckAttempts),
CheckSource: icingadbTypes.String{NullString: row.CheckSource},
})
allHistory = append(allHistory, &history.HistoryState{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "state_change",
},
StateHistoryId: stateHistoryId,
EventTime: ts,
})
if icingadbTypes.StateType(row.StateType) == icingadbTypes.StateHard {
// only hard state changes are relevant for SLA history, discard all others
sla = append(sla, &history.SlaHistoryState{
HistoryTableEntity: history.HistoryTableEntity{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: stateHistoryId},
},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
EventTime: ts,
StateType: icingadbTypes.StateType(row.StateType),
HardState: row.LastHardState,
PreviousHardState: previousHardState,
})
}
}
icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla}
return
}

View file

@ -0,0 +1,11 @@
SELECT ch.commenthistory_id, UNIX_TIMESTAMP(ch.entry_time) entry_time,
ch.entry_time_usec, ch.entry_type, ch.author_name, ch.comment_data, ch.is_persistent,
COALESCE(UNIX_TIMESTAMP(ch.expiration_time), 0) expiration_time,
COALESCE(UNIX_TIMESTAMP(ch.deletion_time), 0) deletion_time,
ch.deletion_time_usec, ch.name, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
FROM icinga_commenthistory ch USE INDEX (PRIMARY)
INNER JOIN icinga_objects o ON o.object_id=ch.object_id
WHERE ch.commenthistory_id BETWEEN :fromid AND :toid
AND ch.commenthistory_id > :checkpoint -- where we were interrupted
ORDER BY ch.commenthistory_id -- this way we know what has already been migrated from just the last row's ID
LIMIT :bulk

View file

@ -0,0 +1,14 @@
SELECT dh.downtimehistory_id, UNIX_TIMESTAMP(dh.entry_time) entry_time, dh.author_name, dh.comment_data,
dh.is_fixed, dh.duration, UNIX_TIMESTAMP(dh.scheduled_start_time) scheduled_start_time,
COALESCE(UNIX_TIMESTAMP(dh.scheduled_end_time), 0) scheduled_end_time,
COALESCE(UNIX_TIMESTAMP(dh.actual_start_time), 0) actual_start_time, dh.actual_start_time_usec,
COALESCE(UNIX_TIMESTAMP(dh.actual_end_time), 0) actual_end_time, dh.actual_end_time_usec, dh.was_cancelled,
COALESCE(UNIX_TIMESTAMP(dh.trigger_time), 0) trigger_time, dh.name, o.objecttype_id,
o.name1, COALESCE(o.name2, '') name2, COALESCE(sd.name, '') triggered_by
FROM icinga_downtimehistory dh USE INDEX (PRIMARY)
INNER JOIN icinga_objects o ON o.object_id=dh.object_id
LEFT JOIN icinga_scheduleddowntime sd ON sd.scheduleddowntime_id=dh.triggered_by_id
WHERE dh.downtimehistory_id BETWEEN :fromid AND :toid
AND dh.downtimehistory_id > :checkpoint -- where we were interrupted
ORDER BY dh.downtimehistory_id -- this way we know what has already been migrated from just the last row's ID
LIMIT :bulk

View file

@ -0,0 +1,15 @@
PRAGMA main.auto_vacuum = 1;
-- Icinga DB's flapping_history#start_time per flapping_end row (IDO's icinga_flappinghistory#flappinghistory_id).
CREATE TABLE IF NOT EXISTS end_start_time (
history_id INT PRIMARY KEY,
event_time INT NOT NULL,
event_time_usec INT NOT NULL
);
-- Helper table, the last start_time per icinga_statehistory#object_id.
CREATE TABLE IF NOT EXISTS last_start_time (
object_id INT PRIMARY KEY,
event_time INT NOT NULL,
event_time_usec INT NOT NULL
);

View file

@ -0,0 +1,9 @@
SELECT fh.flappinghistory_id, UNIX_TIMESTAMP(fh.event_time) event_time,
fh.event_time_usec, fh.event_type, fh.percent_state_change, fh.low_threshold,
fh.high_threshold, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
FROM icinga_flappinghistory fh USE INDEX (PRIMARY)
INNER JOIN icinga_objects o ON o.object_id=fh.object_id
WHERE fh.flappinghistory_id BETWEEN :fromid AND :toid
AND fh.flappinghistory_id > :checkpoint -- where we were interrupted
ORDER BY fh.flappinghistory_id -- this way we know what has already been migrated from just the last row's ID
LIMIT :bulk

View file

@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS ido_migration_progress (
environment_id CHAR(40) NOT NULL, -- Hex SHA1. Rationale: CHAR(40) is not RDBMS-specific
history_type VARCHAR(63) NOT NULL,
from_ts BIGINT NOT NULL,
to_ts BIGINT NOT NULL,
last_ido_id BIGINT NOT NULL,
CONSTRAINT pk_ido_migration_progress PRIMARY KEY (environment_id, history_type, from_ts, to_ts)
);

View file

@ -0,0 +1,9 @@
SELECT n.notification_id, n.notification_reason, UNIX_TIMESTAMP(n.end_time) end_time,
n.end_time_usec, n.state, COALESCE(n.output, '') output, n.long_output,
n.contacts_notified, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
FROM icinga_notifications n USE INDEX (PRIMARY)
INNER JOIN icinga_objects o ON o.object_id=n.object_id
WHERE n.notification_id BETWEEN :fromid AND :toid
AND n.notification_id <= :cache_limit AND n.notification_id > :checkpoint -- where we were interrupted
ORDER BY n.notification_id -- this way we know what has already been migrated from just the last row's ID
LIMIT :bulk

View file

@ -0,0 +1,22 @@
PRAGMA main.auto_vacuum = 1;
-- Icinga DB's state_history#previous_hard_state per IDO's icinga_statehistory#statehistory_id.
CREATE TABLE IF NOT EXISTS previous_hard_state (
history_id INT PRIMARY KEY,
previous_hard_state INT NOT NULL
);
-- Helper table, the current last_hard_state per icinga_statehistory#object_id.
CREATE TABLE IF NOT EXISTS next_hard_state (
object_id INT PRIMARY KEY,
next_hard_state INT NOT NULL
);
-- Helper table for stashing icinga_statehistory#statehistory_id until last_hard_state changes.
CREATE TABLE IF NOT EXISTS next_ids (
object_id INT NOT NULL,
history_id INT NOT NULL
);
CREATE INDEX IF NOT EXISTS next_ids_object_id ON next_ids (object_id);
CREATE INDEX IF NOT EXISTS next_ids_history_id ON next_ids (history_id);

View file

@ -0,0 +1,9 @@
SELECT sh.statehistory_id, UNIX_TIMESTAMP(sh.state_time) state_time, sh.state_time_usec, sh.state,
sh.state_type, sh.current_check_attempt, sh.max_check_attempts, sh.last_state, sh.last_hard_state,
sh.output, sh.long_output, sh.check_source, o.objecttype_id, o.name1, COALESCE(o.name2, '') name2
FROM icinga_statehistory sh USE INDEX (PRIMARY)
INNER JOIN icinga_objects o ON o.object_id=sh.object_id
WHERE sh.statehistory_id BETWEEN :fromid AND :toid
AND sh.statehistory_id <= :cache_limit AND sh.statehistory_id > :checkpoint -- where we were interrupted
ORDER BY sh.statehistory_id -- this way we know what has already been migrated from just the last row's ID
LIMIT :bulk

View file

@ -0,0 +1,500 @@
package main
import (
"context"
"crypto/sha1"
"database/sql"
_ "embed"
"encoding/hex"
"fmt"
"github.com/creasty/defaults"
"github.com/goccy/go-yaml"
"github.com/icinga/icingadb/pkg/config"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jessevdk/go-flags"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
_ "github.com/mattn/go-sqlite3"
"github.com/pkg/errors"
"github.com/vbauerster/mpb/v6"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"math"
"os"
"path"
"path/filepath"
"regexp"
"strings"
"time"
)
// Flags defines the CLI flags.
type Flags struct {
// Config is the path to the config file.
Config string `short:"c" long:"config" description:"path to config file" required:"true"`
// Cache is a (not necessarily yet existing) directory for caching.
Cache string `short:"t" long:"cache" description:"path for caching" required:"true"`
}
// Config defines the YAML config structure.
type Config struct {
IDO struct {
config.Database `yaml:"-,inline"`
From int32 `yaml:"from"`
To int32 `yaml:"to" default:"2147483647"`
} `yaml:"ido"`
IcingaDB config.Database `yaml:"icingadb"`
// Icinga2 specifies information the IDO doesn't provide.
Icinga2 struct {
// Env specifies the environment ID, hex.
Env string `yaml:"env"`
// Endpoint specifies the name on the main endpoint writing to IDO.
Endpoint string `yaml:"endpoint"`
} `yaml:"icinga2"`
}
// main validates the CLI, parses the config and migrates history from IDO to Icinga DB (see comments below).
// Most of the called functions exit the whole program by themselves on non-recoverable errors.
func main() {
f := &Flags{}
if _, err := flags.NewParser(f, flags.Default).Parse(); err != nil {
os.Exit(2)
}
c, ex := parseConfig(f)
if c == nil {
os.Exit(ex)
}
envId, err := hex.DecodeString(c.Icinga2.Env)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "bad env ID: %s\n", err.Error())
os.Exit(2)
}
defer func() { _ = log.Sync() }()
log.Info("Starting IDO to Icinga DB history migration")
ido, idb := connectAll(c)
if err := idb.CheckSchema(context.Background()); err != nil {
log.Fatalf("%+v", err)
}
// Start repeatable-read-isolated transactions (consistent SELECTs)
// not to have to care for IDO data changes during migration.
startIdoTx(ido)
// Prepare the directory structure the following fillCache() will need later.
mkCache(f, c, idb.Mapper)
log.Info("Computing progress")
// Convert Config#IDO.From and .To to IDs to restrict data by PK.
computeIdRange(c)
// computeProgress figures out which data has already been migrated
// not to start from the beginning every time in the following migrate().
computeProgress(c, idb, envId)
// On rationale read buildEventTimeCache() and buildPreviousHardStateCache() docs.
log.Info("Filling cache")
fillCache()
log.Info("Actually migrating")
migrate(c, idb, envId)
log.Info("Cleaning up cache")
cleanupCache(f)
}
// parseConfig validates the f.Config file and returns the config and -1 or - on failure - nil and an exit code.
func parseConfig(f *Flags) (_ *Config, exit int) {
cf, err := os.Open(f.Config)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "can't open config file: %s\n", err.Error())
return nil, 2
}
defer func() { _ = cf.Close() }()
c := &Config{}
if err := defaults.Set(c); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "can't set config defaults: %s\n", err.Error())
return nil, 2
}
if err := yaml.NewDecoder(cf).Decode(c); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "can't parse config file: %s\n", err.Error())
return nil, 2
}
return c, -1
}
var nonWords = regexp.MustCompile(`\W+`)
// mkCache ensures <f.Cache>/<history type>.sqlite3 files are present and contain their schema
// and initializes types[*].cache. (On non-recoverable errors the whole program exits.)
func mkCache(f *Flags, c *Config, mapper *reflectx.Mapper) {
log.Info("Preparing cache")
if err := os.MkdirAll(f.Cache, 0700); err != nil {
log.With("dir", f.Cache).Fatalf("%+v", errors.Wrap(err, "can't create directory"))
}
types.forEach(func(ht *historyType) {
if ht.cacheSchema == "" {
return
}
file := path.Join(f.Cache, fmt.Sprintf(
"%s_%d-%d.sqlite3", nonWords.ReplaceAllLiteralString(ht.name, "_"), c.IDO.From, c.IDO.To,
))
var err error
ht.cache, err = sqlx.Open("sqlite3", "file:"+file)
if err != nil {
log.With("file", file).Fatalf("%+v", errors.Wrap(err, "can't open SQLite database"))
}
ht.cacheFile = file
ht.cache.Mapper = mapper
if _, err := ht.cache.Exec(ht.cacheSchema); err != nil {
log.With("file", file, "ddl", ht.cacheSchema).
Fatalf("%+v", errors.Wrap(err, "can't import schema into SQLite database"))
}
})
}
// connectAll connects to ido and idb (Icinga DB) as c specifies. (On non-recoverable errors the whole program exits.)
func connectAll(c *Config) (ido, idb *icingadb.DB) {
log.Info("Connecting to databases")
eg, _ := errgroup.WithContext(context.Background())
eg.Go(func() error {
ido = connect("IDO", &c.IDO.Database)
return nil
})
eg.Go(func() error {
idb = connect("Icinga DB", &c.IcingaDB)
return nil
})
_ = eg.Wait()
return
}
// connect connects to which DB as cfg specifies. (On non-recoverable errors the whole program exits.)
func connect(which string, cfg *config.Database) *icingadb.DB {
db, err := cfg.Open(logging.NewLogger(zap.NewNop().Sugar(), 20*time.Second))
if err != nil {
log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database"))
}
if err := db.Ping(); err != nil {
log.With("backend", which).Fatalf("%+v", errors.Wrap(err, "can't connect to database"))
}
return db
}
// startIdoTx initializes types[*].snapshot with new repeatable-read-isolated ido transactions.
// (On non-recoverable errors the whole program exits.)
func startIdoTx(ido *icingadb.DB) {
types.forEach(func(ht *historyType) {
tx, err := ido.BeginTxx(context.Background(), &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
log.Fatalf("%+v", errors.Wrap(err, "can't begin snapshot transaction"))
}
ht.snapshot = tx
})
}
// computeIdRange initializes types[*].fromId and types[*].toId.
// (On non-recoverable errors the whole program exits.)
func computeIdRange(c *Config) {
types.forEach(func(ht *historyType) {
getBorderId := func(id *uint64, timeColumns []string, compOperator string, borderTime int32, sortOrder string) {
deZeroFied := make([]string, 0, len(timeColumns))
for _, column := range timeColumns {
deZeroFied = append(deZeroFied, fmt.Sprintf(
"CASE WHEN %[1]s < '1970-01-03 00:00:00' THEN NULL ELSE %[1]s END", column,
))
}
var timeExpr string
if len(deZeroFied) > 1 {
timeExpr = "COALESCE(" + strings.Join(deZeroFied, ",") + ")"
} else {
timeExpr = deZeroFied[0]
}
query := ht.snapshot.Rebind(
"SELECT " + ht.idoIdColumn + " FROM " + ht.idoTable + " WHERE " + timeExpr + " " + compOperator +
" FROM_UNIXTIME(?) ORDER BY " + ht.idoIdColumn + " " + sortOrder + " LIMIT 1",
)
switch err := ht.snapshot.Get(id, query, borderTime); err {
case nil, sql.ErrNoRows:
default:
log.With("backend", "IDO", "query", query, "args", []any{borderTime}).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
ht.fromId = math.MaxInt64
getBorderId(&ht.fromId, ht.idoEndColumns, ">=", c.IDO.From, "ASC")
getBorderId(&ht.toId, ht.idoStartColumns, "<=", c.IDO.To, "DESC")
})
}
//go:embed embed/ido_migration_progress_schema.sql
var idoMigrationProgressSchema string
// computeProgress initializes types[*].lastId, types[*].total and types[*].done.
// (On non-recoverable errors the whole program exits.)
func computeProgress(c *Config, idb *icingadb.DB, envId []byte) {
if _, err := idb.Exec(idoMigrationProgressSchema); err != nil {
log.Fatalf("%+v", errors.Wrap(err, "can't create table ido_migration_progress"))
}
envIdHex := hex.EncodeToString(envId)
types.forEach(func(ht *historyType) {
var query = idb.Rebind(
"SELECT last_ido_id FROM ido_migration_progress" +
" WHERE environment_id=? AND history_type=? AND from_ts=? AND to_ts=?",
)
args := []any{envIdHex, ht.name, c.IDO.From, c.IDO.To}
if err := idb.Get(&ht.lastId, query, args...); err != nil && err != sql.ErrNoRows {
log.With("backend", "Icinga DB", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
})
types.forEach(func(ht *historyType) {
if ht.cacheFiller != nil {
err := ht.snapshot.Get(
&ht.cacheTotal,
ht.snapshot.Rebind(
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
"SELECT COUNT(*) FROM "+ht.idoTable+
" xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ht.idoIdColumn+" <= ?",
),
ht.toId,
)
if err != nil {
log.Fatalf("%+v", errors.Wrap(err, "can't count query"))
}
}
})
types.forEach(func(ht *historyType) {
var rows []struct {
Migrated uint8
Cnt int64
}
err := ht.snapshot.Select(
&rows,
ht.snapshot.Rebind(
// For actual migration icinga_objects will be joined anyway,
// so it makes no sense to take vanished objects into account.
"SELECT CASE WHEN xh."+ht.idoIdColumn+"<=? THEN 1 ELSE 0 END migrated, COUNT(*) cnt FROM "+
ht.idoTable+" xh INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" BETWEEN ? AND ? GROUP BY migrated",
),
ht.lastId, ht.fromId, ht.toId,
)
if err != nil {
log.Fatalf("%+v", errors.Wrap(err, "can't count query"))
}
for _, row := range rows {
ht.total += row.Cnt
if row.Migrated == 1 {
ht.done = row.Cnt
}
}
log.Infow("Counted migrated IDO events", "type", ht.name, "migrated", ht.done, "total", ht.total)
})
}
// fillCache fills <f.Cache>/<history type>.sqlite3 (actually types[*].cacheFiller does).
func fillCache() {
progress := mpb.New()
for _, ht := range types {
if ht.cacheFiller != nil {
ht.setupBar(progress, ht.cacheTotal)
}
}
types.forEach(func(ht *historyType) {
if ht.cacheFiller != nil {
ht.cacheFiller(ht)
}
})
progress.Wait()
}
// migrate does the actual migration.
func migrate(c *Config, idb *icingadb.DB, envId []byte) {
endpointId := sha1.Sum([]byte(c.Icinga2.Endpoint))
progress := mpb.New()
for _, ht := range types {
ht.setupBar(progress, ht.total)
}
types.forEach(func(ht *historyType) {
ht.migrate(c, idb, envId, endpointId, ht)
})
progress.Wait()
}
// migrate does the actual migration for one history type.
func migrateOneType[IdoRow any](
c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, ht *historyType,
convertRows func(env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx,
idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any),
) {
var lastQuery string
var lastStmt *sqlx.Stmt
defer func() {
if lastStmt != nil {
_ = lastStmt.Close()
}
}()
selectCache := func(dest interface{}, query string, args ...interface{}) {
// Prepare new one, if old one doesn't fit anymore.
if query != lastQuery {
if lastStmt != nil {
_ = lastStmt.Close()
}
var err error
lastStmt, err = ht.cache.Preparex(query)
if err != nil {
log.With("backend", "cache", "query", query).
Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
}
lastQuery = query
}
if err := lastStmt.Select(dest, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
var args map[string]interface{}
// For the case that the cache was older that the IDO,
// but ht.cacheFiller couldn't update it, limit (WHERE) our source data set.
if ht.cacheLimitQuery != "" {
var limit sql.NullInt64
cacheGet(ht.cache, &limit, ht.cacheLimitQuery)
args = map[string]interface{}{"cache_limit": limit.Int64}
}
upsertProgress, _ := idb.BuildUpsertStmt(&IdoMigrationProgress{})
envIdHex := hex.EncodeToString(envId)
ht.bar.SetCurrent(ht.done)
// Stream IDO rows, ...
sliceIdoHistory(
ht, ht.migrationQuery, args, ht.lastId,
func(idoRows []IdoRow) (checkpoint interface{}) {
// ... convert them, ...
inserts, upserts, lastIdoId := convertRows(
c.Icinga2.Env, envId, endpointId[:], selectCache, ht.snapshot, idoRows,
)
// ... and insert them:
for _, op := range []struct {
kind string
data [][]contracts.Entity
streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error
}{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} {
for _, table := range op.data {
if len(table) < 1 {
continue
}
ch := make(chan contracts.Entity, len(table))
for _, row := range table {
ch <- row
}
close(ch)
if err := op.streamer(context.Background(), ch); err != nil {
log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
}
if lastIdoId != nil {
args := map[string]interface{}{"history_type": ht.name, "last_ido_id": lastIdoId}
_, err := idb.NamedExec(upsertProgress, &IdoMigrationProgress{
IdoMigrationProgressUpserter{lastIdoId}, envIdHex, ht.name, c.IDO.From, c.IDO.To,
})
if err != nil {
log.With("backend", "Icinga DB", "dml", upsertProgress, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
ht.bar.IncrBy(len(idoRows))
return lastIdoId
},
)
ht.bar.SetTotal(ht.bar.Current(), true)
}
// cleanupCache removes <f.Cache>/<history type>.sqlite3 files.
func cleanupCache(f *Flags) {
types.forEach(func(ht *historyType) {
if ht.cacheFile != "" {
if err := ht.cache.Close(); err != nil {
log.With("file", ht.cacheFile).Warnf("%+v", errors.Wrap(err, "can't close SQLite database"))
}
}
})
if matches, err := filepath.Glob(path.Join(f.Cache, "*.sqlite3")); err == nil {
for _, match := range matches {
if err := os.Remove(match); err != nil {
log.With("file", match).Warnf("%+v", errors.Wrap(err, "can't remove SQLite database"))
}
}
} else {
log.With("dir", f.Cache).Warnf("%+v", errors.Wrap(err, "can't list SQLite databases"))
}
}

View file

@ -0,0 +1,317 @@
package main
import (
"context"
"crypto/sha1"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingadb/objectpacker"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/vbauerster/mpb/v6"
"github.com/vbauerster/mpb/v6/decor"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"strings"
"time"
)
type IdoMigrationProgressUpserter struct {
LastIdoId any `json:"last_ido_id"`
}
// Upsert implements the contracts.Upserter interface.
func (impu *IdoMigrationProgressUpserter) Upsert() interface{} {
return impu
}
type IdoMigrationProgress struct {
IdoMigrationProgressUpserter `json:",inline"`
EnvironmentId string `json:"environment_id"`
HistoryType string `json:"history_type"`
FromTs int32 `json:"from_ts"`
ToTs int32 `json:"to_ts"`
}
// Assert interface compliance.
var (
_ contracts.Upserter = (*IdoMigrationProgressUpserter)(nil)
_ contracts.Upserter = (*IdoMigrationProgress)(nil)
)
// log is the root logger.
var log = func() *zap.SugaredLogger {
logger, err := zap.NewDevelopmentConfig().Build()
if err != nil {
panic(err)
}
return logger.Sugar()
}()
// objectTypes maps IDO values to Icinga DB ones.
var objectTypes = map[uint8]string{1: "host", 2: "service"}
// hashAny combines objectpacker.PackAny and SHA1 hashing.
func hashAny(in interface{}) []byte {
hash := sha1.New()
if err := objectpacker.PackAny(in, hash); err != nil {
panic(err)
}
return hash.Sum(nil)
}
// convertTime converts *nix timestamps from the IDO for Icinga DB.
func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli {
if ts == 0 && tsUs == 0 {
return icingadbTypes.UnixMilli{}
}
return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond)))
}
// calcObjectId calculates the ID of the config object named name1 for Icinga DB.
func calcObjectId(env, name1 string) []byte {
if name1 == "" {
return nil
}
return hashAny([2]string{env, name1})
}
// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB.
func calcServiceId(env, name1, name2 string) []byte {
if name2 == "" {
return nil
}
return hashAny([2]string{env, name1 + "!" + name2})
}
// sliceIdoHistory performs query with args+fromid,toid,checkpoint,bulk on ht.snapshot
// and passes the results to onRows until either an empty result set or onRows() returns nil.
// Rationale: split the likely large result set of a query by adding a WHERE condition and a LIMIT,
// both with :named placeholders (:checkpoint, :bulk).
// checkpoint is the initial value for the WHERE condition, onRows() returns follow-up ones.
// (On non-recoverable errors the whole program exits.)
func sliceIdoHistory[Row any](
ht *historyType, query string, args map[string]any,
checkpoint interface{}, onRows func([]Row) (checkpoint interface{}),
) {
if args == nil {
args = map[string]interface{}{}
}
args["fromid"] = ht.fromId
args["toid"] = ht.toId
args["checkpoint"] = checkpoint
args["bulk"] = 20000
if ht.snapshot.DriverName() != driver.MySQL {
query = strings.ReplaceAll(query, " USE INDEX (PRIMARY)", "")
}
for {
// TODO: use Tx#SelectNamed() one nice day (https://github.com/jmoiron/sqlx/issues/779)
stmt, err := ht.snapshot.PrepareNamed(query)
if err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
}
var rows []Row
if err := stmt.Select(&rows, args); err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
_ = stmt.Close()
if len(rows) < 1 {
break
}
if checkpoint = onRows(rows); checkpoint == nil {
break
}
args["checkpoint"] = checkpoint
}
}
type progressBar struct {
*mpb.Bar
lastUpdate time.Time
}
// IncrBy does pb.Bar.DecoratorEwmaUpdate() automatically.
func (pb *progressBar) IncrBy(n int) {
pb.Bar.IncrBy(n)
now := time.Now()
if !pb.lastUpdate.IsZero() {
pb.Bar.DecoratorEwmaUpdate(now.Sub(pb.lastUpdate))
}
pb.lastUpdate = now
}
// historyType specifies a history data type.
type historyType struct {
// name is a human-readable common name.
name string
// idoTable specifies the source table.
idoTable string
// idoIdColumn specifies idoTable's primary key.
idoIdColumn string
// idoStartColumns specifies idoTable's event start time locations. (First non-NULL is used.)
idoStartColumns []string
// idoEndColumns specifies idoTable's event end time locations. (First non-NULL is used.)
idoEndColumns []string
// cacheSchema specifies <name>.sqlite3's structure.
cacheSchema string
// cacheFiller fills cache from snapshot.
cacheFiller func(*historyType)
// cacheLimitQuery rationale: see migrate().
cacheLimitQuery string
// migrationQuery SELECTs source data for actual migration.
migrationQuery string
// migrate does the actual migration.
migrate func(c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, ht *historyType)
// cacheFile locates <name>.sqlite3.
cacheFile string
// cache represents <cacheFile>.
cache *sqlx.DB
// snapshot represents the data source.
snapshot *sqlx.Tx
// fromId is the first IDO row ID to migrate.
fromId uint64
// toId is the last IDO row ID to migrate.
toId uint64
// total summarizes the source data.
total int64
// cacheTotal summarizes the cache source data.
cacheTotal int64
// done summarizes the migrated data.
done int64
// bar represents the current progress bar.
bar *progressBar
// lastId is the last already migrated ID.
lastId uint64
}
// setupBar (re-)initializes ht.bar.
func (ht *historyType) setupBar(progress *mpb.Progress, total int64) {
ht.bar = &progressBar{Bar: progress.AddBar(
total,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.Name(ht.name, decor.WC{W: len(ht.name) + 1, C: decor.DidentRight}),
decor.Percentage(decor.WC{W: 5}),
),
mpb.AppendDecorators(
decor.EwmaETA(decor.ET_STYLE_GO, 0, decor.WC{W: 4}),
decor.Name(" "),
decor.EwmaSpeed(0, "%.0f/s", 0, decor.WC{W: 4}),
),
)}
}
type historyTypes []*historyType
// forEach performs f per hts in parallel.
func (hts historyTypes) forEach(f func(*historyType)) {
eg, _ := errgroup.WithContext(context.Background())
for _, ht := range hts {
ht := ht
eg.Go(func() error {
f(ht)
return nil
})
}
_ = eg.Wait()
}
var types = historyTypes{
{
name: "ack & comment",
idoTable: "icinga_commenthistory",
idoIdColumn: "commenthistory_id",
idoStartColumns: []string{"entry_time"},
// Manual deletion time wins vs. time of expiration which never happens due to manual deletion.
idoEndColumns: []string{"deletion_time", "expiration_time"},
migrationQuery: commentMigrationQuery,
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
migrateOneType(c, idb, envId, endpId, ht, convertCommentRows)
},
},
{
name: "downtime",
idoTable: "icinga_downtimehistory",
idoIdColumn: "downtimehistory_id",
// Fall back to scheduled time if actual time is missing.
idoStartColumns: []string{"actual_start_time", "scheduled_start_time"},
idoEndColumns: []string{"actual_end_time", "scheduled_end_time"},
migrationQuery: downtimeMigrationQuery,
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
migrateOneType(c, idb, envId, endpId, ht, convertDowntimeRows)
},
},
{
name: "flapping",
idoTable: "icinga_flappinghistory",
idoIdColumn: "flappinghistory_id",
idoStartColumns: []string{"event_time"},
idoEndColumns: []string{"event_time"},
cacheSchema: eventTimeCacheSchema,
cacheFiller: func(ht *historyType) {
buildEventTimeCache(ht, []string{
"xh.flappinghistory_id id", "UNIX_TIMESTAMP(xh.event_time) event_time",
"xh.event_time_usec", "1001-xh.event_type event_is_start", "xh.object_id",
})
},
migrationQuery: flappingMigrationQuery,
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
migrateOneType(c, idb, envId, endpId, ht, convertFlappingRows)
},
},
{
name: "notification",
idoTable: "icinga_notifications",
idoIdColumn: "notification_id",
idoStartColumns: []string{"start_time"},
idoEndColumns: []string{"end_time"},
cacheSchema: previousHardStateCacheSchema,
cacheFiller: func(ht *historyType) {
buildPreviousHardStateCache(ht, []string{
"xh.notification_id id", "xh.object_id", "xh.state last_hard_state",
})
},
cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
migrationQuery: notificationMigrationQuery,
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
migrateOneType(c, idb, envId, endpId, ht, convertNotificationRows)
},
},
{
name: "state",
idoTable: "icinga_statehistory",
idoIdColumn: "statehistory_id",
idoStartColumns: []string{"state_time"},
idoEndColumns: []string{"state_time"},
cacheSchema: previousHardStateCacheSchema,
cacheFiller: func(ht *historyType) {
buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"})
},
cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
migrationQuery: stateMigrationQuery,
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
migrateOneType(c, idb, envId, endpId, ht, convertStateRows)
},
},
}

View file

@ -6,7 +6,6 @@ import (
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/internal/command"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/driver"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/icingadb/history"
"github.com/icinga/icingadb/pkg/icingadb/overdue"
@ -29,11 +28,9 @@ import (
)
const (
ExitSuccess = 0
ExitFailure = 1
expectedRedisSchemaVersion = "5"
expectedMysqlSchemaVersion = 3
expectedPostgresSchemaVersion = 1
ExitSuccess = 0
ExitFailure = 1
expectedRedisSchemaVersion = "5"
)
func main() {
@ -74,7 +71,7 @@ func run() int {
}
}
if err := checkDbSchema(context.Background(), db); err != nil {
if err := db.CheckSchema(context.Background()); err != nil {
logger.Fatalf("%+v", err)
}
@ -358,36 +355,6 @@ func run() int {
}
}
// checkDbSchema asserts the database schema of the expected version being present.
func checkDbSchema(ctx context.Context, db *icingadb.DB) error {
var expectedDbSchemaVersion uint16
switch db.DriverName() {
case driver.MySQL:
expectedDbSchemaVersion = expectedMysqlSchemaVersion
case driver.PostgreSQL:
expectedDbSchemaVersion = expectedPostgresSchemaVersion
}
var version uint16
err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version)
if err != nil {
return errors.Wrap(err, "can't check database schema version")
}
if version != expectedDbSchemaVersion {
// Since these error messages are trivial and mostly caused by users, we don't need
// to print a stack trace here. However, since errors.Errorf() does this automatically,
// we need to use fmt instead.
return fmt.Errorf(
"unexpected database schema version: v%d (expected v%d), please make sure you have applied all database"+
" migrations after upgrading Icinga DB", version, expectedDbSchemaVersion,
)
}
return nil
}
// monitorRedisSchema monitors rc's icinga:schema version validity.
func monitorRedisSchema(logger *logging.Logger, rc *icingaredis.Client, pos string) {
for {

92
doc/06-Migration.md Normal file
View file

@ -0,0 +1,92 @@
# Migration from IDO
The Icinga DB Migration commandline tool migrates history data from [IDO] to
Icinga DB. Or, more precisely: from the IDO SQL database to the Icinga DB one.
!!! info
Everything else is already populated by Icinga DB itself.
Only the past history data of existing IDO setups
isn't known to Icinga DB without migration from IDO.
## Icinga DB
1. Make sure Icinga DB is up, running and writing to its database.
2. Optionally disable Icinga 2's IDO feature.
!!! warning
Migration will cause duplicate Icinga DB events
for the period both IDO and Icinga DB are active.
Read on, there is a way to avoid that.
## Configuration file
Create a YAML file like this somewhere:
```yaml
icinga2:
# Content of /var/lib/icinga2/icingadb.env
env: "da39a3ee5e6b4b0d3255bfef95601890afBADHEX"
# Name of the main Icinga 2 endpoint writing to IDO
endpoint: master-1
# IDO database
ido:
type: pgsql
host: 192.0.2.1
port: 5432
database: icinga
user: icinga
password: CHANGEME
# Input time range
#from: 0
#to: 2147483647
# Icinga DB database
icingadb:
type: mysql
host: 2001:db8::1
port: 3306
database: icingadb
user: icingadb
password: CHANGEME
```
### Input time range
By default, everything is migrated. If you wish, you can restrict the input
data's start and/or end by giving `from` and/or `to` under `ido:` as Unix
timestamps (in seconds).
Examples:
* Now: Run in a shell: `date +%s`
* One year ago: Run in a shell: `date -d -1year +%s`
* Icinga DB usage start time: Query the Icinga DB database:
`SELECT MIN(event_time)/1000 FROM history;`
The latter is useful for the range end to avoid duplicate events.
## Cache directory
Choose a (not necessarily yet existing) directory for Icinga DB Migration's
internal cache. If either there isn't much to migrate or the migration
process won't be interrupted by a reboot (of the machine
Icinga DB migration/database runs on), `mktemp -d` is enough.
## Actual migration
Run:
```shell
icingadb-migrate -c icingadb-migration.yml -t ~/icingadb-migration.cache
```
In case of an interrupt re-run.
!!! tip
If there is much to migrate, use e.g. tmux to
protect yourself against SSH connection losses.
[IDO]: https://icinga.com/docs/icinga-2/latest/doc/14-features/#ido-database-db-ido

6
go.mod
View file

@ -11,16 +11,20 @@ require (
github.com/jessevdk/go-flags v1.5.0
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.6
github.com/mattn/go-sqlite3 v1.14.6
github.com/okzk/sdnotify v0.0.0-20180710141335-d9becc38acbd
github.com/pkg/errors v0.9.1
github.com/ssgreg/journald v1.0.0
github.com/stretchr/testify v1.8.0
github.com/vbauerster/mpb/v6 v6.0.4
go.uber.org/zap v1.21.0
golang.org/x/exp v0.0.0-20220613132600-b0d781184e0d
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
@ -28,7 +32,9 @@ require (
github.com/fatih/color v1.10.0 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-runewidth v0.0.12 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect

12
go.sum
View file

@ -1,3 +1,7 @@
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
@ -45,6 +49,8 @@ github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.12 h1:Y41i/hVW3Pgwr8gV+J23B9YEY0zxjptBuCWEaxmAOow=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
@ -57,6 +63,9 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/ssgreg/journald v1.0.0 h1:0YmTDPJXxcWDPba12qNMdO6TxvfkFSYpFIJ31CwmLcU=
github.com/ssgreg/journald v1.0.0/go.mod h1:RUckwmTM8ghGWPslq2+ZBZzbb9/2KgjzYZ4JEP+oRt0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -67,6 +76,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/vbauerster/mpb/v6 v6.0.4 h1:h6J5zM/2wimP5Hj00unQuV8qbo5EPcj6wbkCqgj7KcY=
github.com/vbauerster/mpb/v6 v6.0.4/go.mod h1:a/+JT57gqh6Du0Ay5jSR+uBMfXGdlR7VQlGP52fJxLM=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
@ -100,6 +111,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

View file

@ -87,6 +87,41 @@ func NewDb(db *sqlx.DB, logger *logging.Logger, options *Options) *DB {
}
}
const (
expectedMysqlSchemaVersion = 3
expectedPostgresSchemaVersion = 1
)
// CheckSchema asserts the database schema of the expected version being present.
func (db *DB) CheckSchema(ctx context.Context) error {
var expectedDbSchemaVersion uint16
switch db.DriverName() {
case driver.MySQL:
expectedDbSchemaVersion = expectedMysqlSchemaVersion
case driver.PostgreSQL:
expectedDbSchemaVersion = expectedPostgresSchemaVersion
}
var version uint16
err := db.QueryRowxContext(ctx, "SELECT version FROM icingadb_schema ORDER BY id DESC LIMIT 1").Scan(&version)
if err != nil {
return errors.Wrap(err, "can't check database schema version")
}
if version != expectedDbSchemaVersion {
// Since these error messages are trivial and mostly caused by users, we don't need
// to print a stack trace here. However, since errors.Errorf() does this automatically,
// we need to use fmt instead.
return fmt.Errorf(
"unexpected database schema version: v%d (expected v%d), please make sure you have applied all database"+
" migrations after upgrading Icinga DB", version, expectedDbSchemaVersion,
)
}
return nil
}
// BuildColumns returns all columns of the given struct.
func (db *DB) BuildColumns(subject interface{}) []string {
fields := db.Mapper.TypeMap(reflect.TypeOf(subject)).Names
@ -131,7 +166,7 @@ func (db *DB) BuildInsertIgnoreStmt(into interface{}) (string, int) {
switch db.DriverName() {
case driver.MySQL:
// MySQL treats UPDATE id = id as a no-op.
clause = "ON DUPLICATE KEY UPDATE id = id"
clause = fmt.Sprintf(`ON DUPLICATE KEY UPDATE "%s" = "%s"`, columns[0], columns[0])
case driver.PostgreSQL:
clause = fmt.Sprintf("ON CONFLICT ON CONSTRAINT pk_%s DO NOTHING", table)
}
@ -529,6 +564,28 @@ func (db *DB) CreateStreamed(
)
}
// CreateIgnoreStreamed bulk creates the specified entities via NamedBulkExec.
// The insert statement is created using BuildInsertIgnoreStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) CreateIgnoreStreamed(
ctx context.Context, entities <-chan contracts.Entity, onSuccess ...OnSuccess[contracts.Entity],
) error {
first, forward, err := com.CopyFirst(ctx, entities)
if first == nil {
return errors.Wrap(err, "can't copy first entity")
}
sem := db.GetSemaphoreForTable(utils.TableName(first))
stmt, placeholders := db.BuildInsertIgnoreStmt(first)
return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, com.SplitOnDupId[contracts.Entity], onSuccess...,
)
}
// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and

View file

@ -15,7 +15,7 @@ type NotificationHistory struct {
State uint8 `json:"state"`
PreviousHardState uint8 `json:"previous_hard_state"`
Author string `json:"author"`
Text string `json:"text"`
Text types.String `json:"text"`
UsersNotified uint16 `json:"users_notified"`
}

View file

@ -66,6 +66,8 @@ github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBK
github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d/go.mod h1:HI8ITrYtUY+O+ZhtlqUnD8+KwNPOyugEhfP9fdUIaEQ=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -462,6 +464,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
@ -581,6 +584,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
@ -639,6 +644,8 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vbauerster/mpb/v6 v6.0.4 h1:h6J5zM/2wimP5Hj00unQuV8qbo5EPcj6wbkCqgj7KcY=
github.com/vbauerster/mpb/v6 v6.0.4/go.mod h1:a/+JT57gqh6Du0Ay5jSR+uBMfXGdlR7VQlGP52fJxLM=
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho=
@ -857,6 +864,7 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=