cmd/ido2icingadb: actually migrate

This commit is contained in:
Alexander A. Klimov 2021-10-08 15:19:13 +02:00
parent 6b583242eb
commit 5396afe32a
5 changed files with 1137 additions and 43 deletions

View file

@ -13,8 +13,8 @@ var eventTimeCacheSchema = []string{
// Icinga DB's flapping_history#start_time per flapping_end row (IDO's icinga_flappinghistory#flappinghistory_id).
`CREATE TABLE IF NOT EXISTS end_start_time (
history_id INT PRIMARY KEY,
event_time INT,
event_time_usec INT
event_time INT NOT NULL,
event_time_usec INT NOT NULL
)`,
// Helper table, the last start_time per icinga_statehistory#object_id.
`CREATE TABLE IF NOT EXISTS last_start_time (
@ -48,7 +48,7 @@ func buildEventTimeCache(ht *historyType, idoColumns []string) {
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" > ? ORDER BY xh."+ht.idoIdColumn+" LIMIT ?",
checkpoint.MaxId.Int64,
nil, checkpoint.MaxId.Int64,
func(idoRows []struct {
Id uint64
EventTime int64
@ -77,12 +77,6 @@ func buildEventTimeCache(ht *historyType, idoColumns []string) {
onDeleted(cacheExec(
*tx, false, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId,
))
} else {
cacheExec(
*tx, false,
"INSERT INTO end_start_time(history_id, event_time, event_time_usec) "+
"VALUES (?, NULL, NULL)", idoRow.Id,
)
}
} else {
onDeleted(cacheExec(
@ -179,7 +173,7 @@ func buildPreviousHardStateCache(ht *historyType, idoColumns []string) {
"SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+
" xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+
ht.idoIdColumn+" < ? ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT ?",
checkpoint,
nil, checkpoint,
func(idoRows []struct {
Id uint64
ObjectId uint64
@ -293,8 +287,10 @@ func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, onDeleted func(sql.Resul
}
}
func cacheGet(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) {
if err := cacheTx.Get(dest, query, args...); err != nil {
func cacheGet(cache interface {
Get(dest interface{}, query string, args ...interface{}) error
}, dest interface{}, query string, args ...interface{}) {
if err := cache.Get(dest, query, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Errorf("%+v", errors.Wrap(err, "can't perform query"))
}

861
cmd/ido2icingadb/convert.go Normal file
View file

@ -0,0 +1,861 @@
package main
import (
"database/sql"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/icingadb/v1/history"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"strings"
"time"
)
const acknowledgementMigrationQuery = "SELECT ah.acknowledgement_id, UNIX_TIMESTAMP(ah.entry_time) entry_time, " +
"ah.entry_time_usec, ah.acknowledgement_type, ah.author_name, ah.comment_data, ah.is_sticky, " +
"ah.persistent_comment, UNIX_TIMESTAMP(ah.end_time) end_time, o.objecttype_id, o.name1, " +
"IFNULL(o.name2, '') name2 " +
"FROM icinga_acknowledgements ah USE INDEX (PRIMARY) " +
"INNER JOIN icinga_objects o ON o.object_id=ah.object_id " +
"WHERE ah.acknowledgement_id > ? " + // where we were interrupted
"ORDER BY ah.acknowledgement_id " + // allows computeProgress() not to check all IDO rows for whether migrated
"LIMIT ?"
// AckClear updates an already migrated ack event with the clear event info.
type AckClear struct {
Id icingadbTypes.Binary
ClearTime icingadbTypes.UnixMilli
}
// Assert interface compliance.
var _ contracts.TableNamer = (*AckClear)(nil)
// TableName implements the contracts.TableNamer interface.
func (*AckClear) TableName() string {
return "acknowledgement_history"
}
type acknowledgementRow = struct {
AcknowledgementId uint64
EntryTime int64
EntryTimeUsec uint32
AcknowledgementType uint8
AuthorName sql.NullString
CommentData sql.NullString
IsSticky uint8
PersistentComment uint8
EndTime sql.NullInt64
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertAcknowledgementRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []acknowledgementRow,
) (icingaDbUpdates, icingaDbInserts [][]interface{}, checkpoint interface{}) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
EventTime int64
EventTimeUsec uint32
}
selectCache(
&cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?",
idoRows[0].AcknowledgementId, idoRows[len(idoRows)-1].AcknowledgementId,
)
// Needed for set time (see below).
cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec)
}
var acknowledgementHistory, acknowledgementHistoryUpdates, allHistory []interface{}
for _, row := range idoRows {
ts := convertTime(row.EntryTime, row.EntryTimeUsec)
// Needed for ID (see below).
var set icingadbTypes.UnixMilli
if row.AcknowledgementType == 0 { // clear
var ok bool
set, ok = cachedById[row.AcknowledgementId]
if !ok {
continue
}
} else {
set = ts
}
name := row.Name1
if row.Name2 != "" {
name += "!" + row.Name2
}
id := mkDeterministicUuid('a', row.AcknowledgementId)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
acknowledgementHistoryId := hashAny([]interface{}{
env, strings.Title(typ), name, float64(utils.UnixMilli(set.Time())),
})
if row.AcknowledgementType == 0 { // clear
// The set counterpart should already have been inserted.
acknowledgementHistoryUpdates = append(acknowledgementHistoryUpdates, &AckClear{
acknowledgementHistoryId, ts,
})
h := &history.HistoryAck{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "ack_clear",
},
AcknowledgementHistoryId: acknowledgementHistoryId,
SetTime: set,
ClearTime: ts,
}
h.EventTime.History = h
allHistory = append(allHistory, h)
} else { // set
acknowledgementHistory = append(acknowledgementHistory, &history.AcknowledgementHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: acknowledgementHistoryId},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
SetTime: set,
Author: icingadbTypes.String{NullString: row.AuthorName},
Comment: icingadbTypes.String{NullString: row.CommentData},
ExpireTime: convertTime(row.EndTime.Int64, 0),
IsPersistent: icingadbTypes.Bool{Bool: row.PersistentComment != 0, Valid: true},
IsSticky: icingadbTypes.Bool{Bool: row.IsSticky != 0, Valid: true},
})
h := &history.HistoryAck{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "ack_set",
},
AcknowledgementHistoryId: acknowledgementHistoryId,
SetTime: set,
}
h.EventTime.History = h
allHistory = append(allHistory, h)
}
checkpoint = row.AcknowledgementId
}
icingaDbUpdates = [][]interface{}{acknowledgementHistoryUpdates}
icingaDbInserts = [][]interface{}{acknowledgementHistory, allHistory}
return
}
const commentMigrationQuery = "SELECT ch.commenthistory_id, UNIX_TIMESTAMP(ch.entry_time) entry_time, " +
"ch.entry_time_usec, ch.entry_type, ch.author_name, ch.comment_data, ch.is_persistent, " +
"IFNULL(UNIX_TIMESTAMP(ch.expiration_time), 0) expiration_time, " +
"IFNULL(UNIX_TIMESTAMP(ch.deletion_time), 0) deletion_time, ch.deletion_time_usec, ch.name, " +
"o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " +
"FROM icinga_commenthistory ch USE INDEX (PRIMARY) " +
"INNER JOIN icinga_objects o ON o.object_id=ch.object_id " +
"WHERE ch.commenthistory_id > ? " + // where we were interrupted
"ORDER BY ch.commenthistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated
"LIMIT ?"
type commentRow = struct {
CommenthistoryId uint64
EntryTime int64
EntryTimeUsec uint32
EntryType uint8
AuthorName string
CommentData string
IsPersistent uint8
ExpirationTime int64
DeletionTime int64
DeletionTimeUsec uint32
Name string
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertCommentRows(
env string, envId, endpointId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow,
) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) {
var commentHistory, allHistory []interface{}
for _, row := range idoRows {
id := calcObjectId(env, row.Name)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
entryTime := convertTime(row.EntryTime, row.EntryTimeUsec)
removeTime := convertTime(row.DeletionTime, row.DeletionTimeUsec)
expireTime := convertTime(row.ExpirationTime, 0)
commentHistory = append(commentHistory, &history.CommentHistory{
CommentHistoryEntity: history.CommentHistoryEntity{CommentId: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
CommentHistoryUpserter: history.CommentHistoryUpserter{
RemoveTime: removeTime,
HasBeenRemoved: icingadbTypes.Bool{Bool: !removeTime.Time().IsZero(), Valid: true},
},
EntryTime: entryTime,
Author: row.AuthorName,
Comment: row.CommentData,
EntryType: icingadbTypes.CommentType(row.EntryType),
IsPersistent: icingadbTypes.Bool{Bool: row.IsPersistent != 0, Valid: true},
IsSticky: icingadbTypes.Bool{Bool: false, Valid: true},
ExpireTime: expireTime,
})
h1 := &history.HistoryComment{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: randomUuid()},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "comment_add",
},
CommentHistoryId: id,
EntryTime: entryTime,
}
h1.EventTime.History = h1
allHistory = append(allHistory, h1)
if !removeTime.Time().IsZero() { // remove
h2 := &history.HistoryComment{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: randomUuid()},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "comment_remove",
},
CommentHistoryId: id,
EntryTime: entryTime,
RemoveTime: removeTime,
ExpireTime: expireTime,
}
h2.EventTime.History = h2
allHistory = append(allHistory, h2)
}
checkpoint = row.CommenthistoryId
}
icingaDbInserts = [][]interface{}{commentHistory, allHistory}
return
}
const downtimeMigrationQuery = "SELECT dh.downtimehistory_id, UNIX_TIMESTAMP(dh.entry_time) entry_time, " +
"dh.author_name, dh.comment_data, dh.is_fixed, dh.duration, " +
"UNIX_TIMESTAMP(dh.scheduled_start_time) scheduled_start_time, " +
"IFNULL(UNIX_TIMESTAMP(dh.scheduled_end_time), 0) scheduled_end_time, " +
"IFNULL(UNIX_TIMESTAMP(dh.actual_start_time), 0) actual_start_time, dh.actual_start_time_usec, " +
"IFNULL(UNIX_TIMESTAMP(dh.actual_end_time), 0) actual_end_time, dh.actual_end_time_usec, dh.was_cancelled, " +
"IFNULL(UNIX_TIMESTAMP(dh.trigger_time), 0) trigger_time, dh.name, o.objecttype_id, o.name1, " +
"IFNULL(o.name2, '') name2, IFNULL(sd.name, '') triggered_by " +
"FROM icinga_downtimehistory dh USE INDEX (PRIMARY) " +
"INNER JOIN icinga_objects o ON o.object_id=dh.object_id " +
"LEFT JOIN icinga_scheduleddowntime sd ON sd.scheduleddowntime_id=dh.triggered_by_id " +
"WHERE dh.downtimehistory_id > ? " + // where we were interrupted
"ORDER BY dh.downtimehistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated
"LIMIT ?"
type downtimeRow = struct {
DowntimehistoryId uint64
EntryTime int64
AuthorName string
CommentData string
IsFixed uint8
Duration int64
ScheduledStartTime int64
ScheduledEndTime int64
ActualStartTime int64
ActualStartTimeUsec uint32
ActualEndTime int64
ActualEndTimeUsec uint32
WasCancelled uint8
TriggerTime int64
Name string
ObjecttypeId uint8
Name1 string
Name2 string
TriggeredBy string
}
func convertDowntimeRows(
env string, envId, endpointId icingadbTypes.Binary,
_ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow,
) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) {
var downtimeHistory, allHistory []interface{}
for _, row := range idoRows {
id := calcObjectId(env, row.Name)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
scheduledStart := convertTime(row.ScheduledStartTime, 0)
scheduledEnd := convertTime(row.ScheduledEndTime, 0)
triggerTime := convertTime(row.TriggerTime, 0)
actualStart := convertTime(row.ActualStartTime, row.ActualStartTimeUsec)
actualEnd := convertTime(row.ActualEndTime, row.ActualEndTimeUsec)
var startTime, endTime, cancelTime icingadbTypes.UnixMilli
if scheduledEnd.Time().IsZero() {
scheduledEnd = icingadbTypes.UnixMilli(scheduledStart.Time().Add(time.Duration(row.Duration) * time.Second))
}
if actualStart.Time().IsZero() {
startTime = scheduledStart
} else {
startTime = actualStart
}
if actualEnd.Time().IsZero() {
endTime = scheduledEnd
} else {
endTime = actualEnd
}
if triggerTime.Time().IsZero() {
triggerTime = startTime
}
if row.WasCancelled != 0 {
cancelTime = actualEnd
}
downtimeHistory = append(downtimeHistory, &history.DowntimeHistory{
DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
DowntimeHistoryUpserter: history.DowntimeHistoryUpserter{
HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
CancelTime: cancelTime,
},
TriggeredById: calcObjectId(env, row.TriggeredBy),
EntryTime: convertTime(row.EntryTime, 0),
Author: row.AuthorName,
Comment: row.CommentData,
IsFlexible: icingadbTypes.Bool{Bool: row.IsFixed == 0, Valid: true},
FlexibleDuration: uint64(row.Duration) * 1000,
ScheduledStartTime: scheduledStart,
ScheduledEndTime: scheduledEnd,
StartTime: startTime,
EndTime: endTime,
TriggerTime: triggerTime,
})
h1 := &history.HistoryDowntime{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: randomUuid()},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "downtime_start",
},
DowntimeHistoryId: id,
StartTime: startTime,
}
h1.EventTime.History = h1
allHistory = append(allHistory, h1)
if !actualEnd.Time().IsZero() { // remove
h2 := &history.HistoryDowntime{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: randomUuid()},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "downtime_end",
},
DowntimeHistoryId: id,
StartTime: startTime,
CancelTime: cancelTime,
EndTime: endTime,
HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true},
}
h2.EventTime.History = h2
allHistory = append(allHistory, h2)
}
checkpoint = row.DowntimehistoryId
}
icingaDbInserts = [][]interface{}{downtimeHistory, allHistory}
return
}
const flappingMigrationQuery = "SELECT fh.flappinghistory_id, UNIX_TIMESTAMP(fh.event_time) event_time, " +
"fh.event_time_usec, fh.event_type, fh.percent_state_change, fh.low_threshold, " +
"fh.high_threshold, o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " +
"FROM icinga_flappinghistory fh USE INDEX (PRIMARY) " +
"INNER JOIN icinga_objects o ON o.object_id=fh.object_id " +
"WHERE fh.flappinghistory_id > ? " + // where we were interrupted
"ORDER BY fh.flappinghistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated
"LIMIT ?"
// FlappingEnd updates an already migrated start event with the end event info.
type FlappingEnd struct {
Id icingadbTypes.Binary
EndTime icingadbTypes.UnixMilli
}
// Assert interface compliance.
var _ contracts.TableNamer = (*FlappingEnd)(nil)
// TableName implements the contracts.TableNamer interface.
func (*FlappingEnd) TableName() string {
return "flapping_history"
}
type flappingRow = struct {
FlappinghistoryId uint64
EventTime int64
EventTimeUsec uint32
EventType uint16
PercentStateChange sql.NullFloat64
LowThreshold float64
HighThreshold float64
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertFlappingRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow,
) (icingaDbUpdates, icingaDbInserts [][]interface{}, checkpoint interface{}) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
EventTime int64
EventTimeUsec uint32
}
selectCache(
&cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?",
idoRows[0].FlappinghistoryId, idoRows[len(idoRows)-1].FlappinghistoryId,
)
// Needed for start time (see below).
cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec)
}
var flappingHistory, flappingHistoryUpdates, allHistory []interface{}
for _, row := range idoRows {
ts := convertTime(row.EventTime, row.EventTimeUsec)
// Needed for ID (see below).
var start icingadbTypes.UnixMilli
if row.EventType == 1001 { // end
var ok bool
start, ok = cachedById[row.FlappinghistoryId]
if !ok {
continue
}
} else {
start = ts
}
name := row.Name1
if row.Name2 != "" {
name += "!" + row.Name2
}
id := mkDeterministicUuid('f', row.FlappinghistoryId)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
flappingHistoryId := hashAny([]interface{}{
env, strings.Title(typ), name, float64(utils.UnixMilli(start.Time())),
})
if row.EventType == 1001 { // end
// The start counterpart should already have been inserted.
flappingHistoryUpdates = append(flappingHistoryUpdates, &FlappingEnd{flappingHistoryId, ts})
h := &history.HistoryFlapping{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "flapping_end",
},
FlappingHistoryId: flappingHistoryId,
StartTime: start,
EndTime: ts,
}
h.EventTime.History = h
allHistory = append(allHistory, h)
} else { // end
flappingHistory = append(flappingHistory, &history.FlappingHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: flappingHistoryId},
},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
FlappingHistoryUpserter: history.FlappingHistoryUpserter{
FlappingThresholdLow: float32(row.LowThreshold),
FlappingThresholdHigh: float32(row.HighThreshold),
},
StartTime: start,
PercentStateChangeStart: icingadbTypes.Float{NullFloat64: row.PercentStateChange},
})
h := &history.HistoryFlapping{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "flapping_start",
},
FlappingHistoryId: flappingHistoryId,
StartTime: start,
}
h.EventTime.History = h
allHistory = append(allHistory, h)
}
checkpoint = row.FlappinghistoryId
}
icingaDbUpdates = [][]interface{}{flappingHistoryUpdates}
icingaDbInserts = [][]interface{}{flappingHistory, allHistory}
return
}
const notificationMigrationQuery = "SELECT n.notification_id, n.notification_reason, " +
"UNIX_TIMESTAMP(n.end_time) end_time, n.end_time_usec, n.state, IFNULL(n.output, '') output, " +
"n.long_output, n.contacts_notified, o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " +
"FROM icinga_notifications n USE INDEX (PRIMARY) " +
"INNER JOIN icinga_objects o ON o.object_id=n.object_id " +
"WHERE n.notification_id <= ? AND " +
"n.notification_id > ? " + // where we were interrupted
"ORDER BY n.notification_id " + // allows computeProgress() not to check all IDO rows for whether migrated
"LIMIT ?"
// zeroHash is a NULL alternative for NOT NULL columns.
var zeroHash = make(icingadbTypes.Binary, 20)
// notificationTypes maps IDO values to Icinga DB ones.
var notificationTypes = map[uint8]icingadbTypes.NotificationType{5: 1, 6: 2, 7: 4, 8: 8, 1: 16, 2: 128, 3: 256}
type notificationRow = struct {
NotificationId uint64
NotificationReason uint8
EndTime int64
EndTimeUsec uint32
State uint8
Output string
LongOutput sql.NullString
ContactsNotified uint16
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertNotificationRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow,
) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
PreviousHardState uint8
}
selectCache(
&cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?",
idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId,
)
cachedById := make(map[uint64]uint8, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = c.PreviousHardState
}
var contacts []struct {
NotificationId uint64
Name1 string
}
{
const query = "SELECT c.notification_id, o.name1 FROM icinga_contactnotifications c " +
"INNER JOIN icinga_objects o ON o.object_id=c.contact_object_id WHERE c.notification_id BETWEEN ? AND ?"
err := ido.Select(&contacts, query, idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId)
if err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}
contactsById := map[uint64]map[string]struct{}{}
for _, contact := range contacts {
perId, ok := contactsById[contact.NotificationId]
if !ok {
perId = map[string]struct{}{}
contactsById[contact.NotificationId] = perId
}
perId[contact.Name1] = struct{}{}
}
var notificationHistory, userNotificationHistory, allHistory []interface{}
for _, row := range idoRows {
previousHardState, ok := cachedById[row.NotificationId]
if !ok {
continue
}
id := mkDeterministicUuid('n', row.NotificationId)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
ts := convertTime(row.EndTime, row.EndTimeUsec)
var nt icingadbTypes.NotificationType
if row.NotificationReason == 0 {
if row.State == 0 {
nt = 64 // recovery
} else {
nt = 32 // problem
}
} else {
nt = notificationTypes[row.NotificationReason]
}
text := row.Output
if row.LongOutput.Valid {
text += "\n\n" + row.LongOutput.String
}
notificationHistory = append(notificationHistory, &history.NotificationHistory{
HistoryTableEntity: history.HistoryTableEntity{Id: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
NotificationId: zeroHash,
Type: nt,
SendTime: ts,
State: row.State,
PreviousHardState: previousHardState,
Author: "-",
Text: text,
UsersNotified: row.ContactsNotified,
})
allHistory = append(allHistory, &history.HistoryNotification{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "notification",
},
NotificationHistoryId: id,
EventTime: ts,
})
for contact := range contactsById[row.NotificationId] {
userId := calcObjectId(env, contact)
userNotificationHistory = append(userNotificationHistory, &history.UserNotificationHistory{
EntityWithoutChecksum: v1.EntityWithoutChecksum{
IdMeta: v1.IdMeta{Id: utils.Checksum(append(append([]byte(nil), id.UUID[:]...), userId...))},
},
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: envId},
NotificationHistoryId: id,
UserId: userId,
})
}
checkpoint = row.NotificationId
}
icingaDbInserts = [][]interface{}{notificationHistory, userNotificationHistory, allHistory}
return
}
const stateMigrationQuery = "SELECT sh.statehistory_id, UNIX_TIMESTAMP(sh.state_time) state_time, " +
"sh.state_time_usec, sh.state, sh.state_type, sh.current_check_attempt, " +
"sh.max_check_attempts, sh.last_state, sh.last_hard_state, sh.output, sh.long_output, " +
"sh.check_source, o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " +
"FROM icinga_statehistory sh USE INDEX (PRIMARY) " +
"INNER JOIN icinga_objects o ON o.object_id=sh.object_id " +
"WHERE sh.statehistory_id <= ? AND " +
"sh.statehistory_id > ? " + // where we were interrupted
"ORDER BY sh.statehistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated
"LIMIT ?"
type stateRow = struct {
StatehistoryId uint64
StateTime int64
StateTimeUsec uint32
State uint8
StateType uint8
CurrentCheckAttempt uint16
MaxCheckAttempts uint16
LastState uint8
LastHardState uint8
Output sql.NullString
LongOutput sql.NullString
CheckSource sql.NullString
ObjecttypeId uint8
Name1 string
Name2 string
}
func convertStateRows(
env string, envId, endpointId icingadbTypes.Binary,
selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow,
) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) {
if len(idoRows) < 1 {
return
}
var cached []struct {
HistoryId uint64
PreviousHardState uint8
}
selectCache(
&cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?",
idoRows[0].StatehistoryId, idoRows[len(idoRows)-1].StatehistoryId,
)
cachedById := make(map[uint64]uint8, len(cached))
for _, c := range cached {
cachedById[c.HistoryId] = c.PreviousHardState
}
var stateHistory, allHistory []interface{}
for _, row := range idoRows {
previousHardState, ok := cachedById[row.StatehistoryId]
if !ok {
continue
}
id := mkDeterministicUuid('s', row.StatehistoryId)
typ := objectTypes[row.ObjecttypeId]
hostId := calcObjectId(env, row.Name1)
serviceId := calcServiceId(env, row.Name1, row.Name2)
ts := convertTime(row.StateTime, row.StateTimeUsec)
stateHistory = append(stateHistory, &history.StateHistory{
HistoryTableEntity: history.HistoryTableEntity{Id: id},
HistoryTableMeta: history.HistoryTableMeta{
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
},
EventTime: ts,
StateType: icingadbTypes.StateType(row.StateType),
SoftState: row.State,
HardState: row.LastHardState,
PreviousSoftState: row.LastState,
PreviousHardState: previousHardState,
Attempt: uint8(row.CurrentCheckAttempt),
Output: icingadbTypes.String{NullString: row.Output},
LongOutput: icingadbTypes.String{NullString: row.LongOutput},
MaxCheckAttempts: uint32(row.MaxCheckAttempts),
CheckSource: icingadbTypes.String{NullString: row.CheckSource},
})
allHistory = append(allHistory, &history.HistoryState{
HistoryMeta: history.HistoryMeta{
HistoryEntity: history.HistoryEntity{Id: id},
EnvironmentId: envId,
EndpointId: endpointId,
ObjectType: typ,
HostId: hostId,
ServiceId: serviceId,
EventType: "state_change",
},
StateHistoryId: id,
EventTime: ts,
})
checkpoint = row.StatehistoryId
}
icingaDbInserts = [][]interface{}{stateHistory, allHistory}
return
}

View file

@ -3,12 +3,14 @@ package main
import (
"bytes"
"context"
"crypto/sha1"
"database/sql"
"fmt"
"github.com/goccy/go-yaml"
"github.com/icinga/icingadb/cmd/internal"
"github.com/icinga/icingadb/pkg/config"
"github.com/icinga/icingadb/pkg/icingadb"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
"github.com/jessevdk/go-flags"
"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/reflectx"
@ -18,6 +20,7 @@ import (
"golang.org/x/sync/errgroup"
"os"
"path"
"reflect"
"strings"
"time"
)
@ -35,7 +38,8 @@ type Config struct {
IDO config.Database `yaml:"ido"`
IcingaDB config.Database `yaml:"icingadb"`
Icinga2 struct {
Env string `yaml:"env"`
Env string `yaml:"env"`
Endpoint string `yaml:"endpoint"`
} `yaml:"icinga2"`
}
@ -71,6 +75,9 @@ func run() int {
log.Info("Filling cache")
fillCache()
log.Info("Actually migrating")
migrate(c, idb)
return internal.ExitSuccess
}
@ -206,7 +213,7 @@ func computeProgress(c *Config, idb *icingadb.DB) {
}
}()
sliceIdoHistory(ht.snapshot, query, 0, func(rows []ProgressRow) (checkpoint interface{}) {
sliceIdoHistory(ht.snapshot, query, nil, 0, func(rows []ProgressRow) (checkpoint interface{}) {
if len(rows) != lastRowsLen {
if lastStmt != nil {
lastStmt.Close()
@ -289,3 +296,154 @@ func fillCache() {
progress.Wait()
}
var tAny = reflect.TypeOf((*interface{})(nil)). // *interface{}
Elem() // interface{}
func migrate(c *Config, idb *icingadb.DB) {
envId := sha1.Sum([]byte(c.Icinga2.Env))
endpointId := sha1.Sum([]byte(c.Icinga2.Endpoint))
progress := mpb.New()
for i := range types {
types[i].setupBar(progress)
}
types.forEach(func(ht *historyType) {
vConvertRows := reflect.ValueOf(ht.convertRows) // TODO: make historyType#convertRows generic[T] one nice day
tRows := vConvertRows.Type(). // func(env string, envId, endpointId Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []T) (icingaDbUpdates, icingaDbInserts [][]interface{}, checkpoint interface{})
In(5) // []T
var lastQuery string
var lastStmt *sqlx.Stmt
defer func() {
if lastStmt != nil {
_ = lastStmt.Close()
}
}()
vConvertRowsArgs := [6]reflect.Value{
reflect.ValueOf(c.Icinga2.Env), reflect.ValueOf(icingadbTypes.Binary(envId[:])),
reflect.ValueOf(icingadbTypes.Binary(endpointId[:])),
reflect.ValueOf(func(dest interface{}, query string, args ...interface{}) {
if query != lastQuery {
if lastStmt != nil {
_ = lastStmt.Close()
}
var err error
lastStmt, err = ht.cache.Preparex(query)
if err != nil {
log.With("backend", "cache", "query", query).
Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
}
lastQuery = query
}
if err := lastStmt.Select(dest, args...); err != nil {
log.With("backend", "cache", "query", query, "args", args).
Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
}),
reflect.ValueOf(ht.snapshot),
}
var args []interface{}
if ht.cacheLimitQuery != "" {
var limit uint64
cacheGet(ht.cache, &limit, ht.cacheLimitQuery)
args = append(args, limit)
}
icingaDbInserts := map[reflect.Type]string{}
icingaDbUpdates := map[reflect.Type]string{}
inc := barIncrementer{ht.bar, time.Now()}
sliceIdoHistory(
ht.snapshot, ht.migrationQuery, args, ht.lastId,
reflect.MakeFunc(reflect.FuncOf([]reflect.Type{tRows}, []reflect.Type{tAny}, false),
func(args []reflect.Value) []reflect.Value {
vConvertRowsArgs[5] = args[0]
res := vConvertRows.Call(vConvertRowsArgs[:])
tx, err := idb.Beginx()
if err != nil {
log.With("backend", "Icinga DB").Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
}
for _, table := range res[0].Interface().([][]interface{}) {
if len(table) < 1 {
continue
}
tRow := reflect.TypeOf(table[0])
update, ok := icingaDbUpdates[tRow]
if !ok {
update, _ = idb.BuildUpdateStmt(table[0])
icingaDbUpdates[tRow] = update
}
stmt, err := tx.PrepareNamed(update)
if err != nil {
log.With("backend", "Icinga DB", "dml", update).
Fatalf("%+v", errors.Wrap(err, "can't prepare DML"))
}
for _, row := range table {
if _, err := stmt.Exec(row); err != nil {
log.With("backend", "Icinga DB", "dml", update, "args", row).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
_ = stmt.Close()
}
for _, table := range res[1].Interface().([][]interface{}) {
if len(table) < 1 {
continue
}
tRow := reflect.TypeOf(table[0])
insert, ok := icingaDbInserts[tRow]
if !ok {
insert, _ = idb.BuildInsertStmt(table[0])
insert = "REPLACE " + strings.TrimPrefix(insert, "INSERT ")
icingaDbInserts[tRow] = insert
}
for len(table) > 0 {
slice := table
if len(slice) > 1000 {
slice = slice[:1000]
table = table[1000:]
} else {
table = nil
}
if _, err := tx.NamedExec(insert, slice); err != nil {
log.With("backend", "Icinga DB", "dml", insert, "args", slice).
Fatalf("%+v", errors.Wrap(err, "can't perform DML"))
}
}
}
if err := tx.Commit(); err != nil {
log.With("backend", "Icinga DB").Fatalf("%+v", errors.Wrap(err, "can't commit transaction"))
}
inc.inc(args[0].Len())
return res[2:]
},
).Interface(),
)
})
progress.Wait()
}

View file

@ -1,12 +1,15 @@
package main
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/sha1"
"encoding/binary"
"github.com/google/uuid"
"github.com/icinga/icingadb/pkg/icingadb/objectpacker"
icingadbTypes "github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/vbauerster/mpb/v6"
@ -14,6 +17,7 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"reflect"
"sync"
"time"
)
@ -52,6 +56,8 @@ var log = func() *zap.SugaredLogger {
return logger.Sugar()
}()
var objectTypes = map[uint8]string{1: "host", 2: "service"}
// mkDeterministicUuid returns a formally random UUID (v4) as follows: 11111122-3300-4455-4455-555555555555
//
// 0: zeroed
@ -60,7 +66,7 @@ var log = func() *zap.SugaredLogger {
// 3: "h" (for "history")
// 4: the new UUID's formal version (unused bits zeroed)
// 5: the ID of the row the new UUID is for in the IDO (big endian)
func mkDeterministicUuid(table byte, rowId uint64) []byte {
func mkDeterministicUuid(table byte, rowId uint64) icingadbTypes.UUID {
uid := uuidTemplate
uid[3] = table
@ -73,7 +79,7 @@ func mkDeterministicUuid(table byte, rowId uint64) []byte {
uid[7] = bEId[0]
copy(uid[9:], bEId[1:])
return uid[:]
return icingadbTypes.UUID{UUID: uid}
}
// uuidTemplate is for mkDeterministicUuid.
@ -90,6 +96,42 @@ var uuidTemplate = func() uuid.UUID {
return uid
}()
// randomUuid generates a new UUIDv4.
func randomUuid() icingadbTypes.UUID {
var rander *bufio.Reader
massRanders.Lock()
for r := range massRanders.pool {
rander = r
delete(massRanders.pool, r)
break
}
massRanders.Unlock()
if rander == nil {
rander = bufio.NewReader(rand.Reader)
}
id, err := uuid.NewRandomFromReader(rander)
if err != nil {
log.Fatalf("%+v", errors.Wrap(err, "can't generate random UUID"))
}
massRanders.Lock()
massRanders.pool[rander] = struct{}{}
massRanders.Unlock()
return icingadbTypes.UUID{UUID: id}
}
var massRanders = struct {
sync.Mutex
pool map[*bufio.Reader]struct{}
}{
sync.Mutex{},
map[*bufio.Reader]struct{}{},
}
// hashAny combines PackAny and SHA1 hashing.
func hashAny(in interface{}) []byte {
hash := sha1.New()
@ -100,12 +142,34 @@ func hashAny(in interface{}) []byte {
return hash.Sum(nil)
}
// convertTime converts *nix timestamps from the IDO for Icinga DB.
func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli {
if ts == 0 && tsUs == 0 {
return icingadbTypes.UnixMilli{}
}
return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond)))
}
// calcObjectId calculates the ID of the config object named name1 for Icinga DB.
func calcObjectId(env, name1 string) []byte {
if name1 == "" {
return nil
}
return hashAny([2]string{env, name1})
}
func sliceIdoHistory(snapshot *sqlx.Tx, query string, checkpoint, onRows interface{}) {
// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB.
func calcServiceId(env, name1, name2 string) []byte {
if name2 == "" {
return nil
}
return hashAny([2]string{env, name1 + "!" + name2})
}
func sliceIdoHistory(snapshot *sqlx.Tx, query string, args []interface{}, checkpoint, onRows interface{}) {
vOnRows := reflect.ValueOf(onRows) // TODO: make onRows generic[T] one nice day
tRows := vOnRows.Type(). // func(rows []T) (checkpoint interface{})
@ -116,15 +180,10 @@ func sliceIdoHistory(snapshot *sqlx.Tx, query string, checkpoint, onRows interfa
vRows := vNewRows.Elem()
onRowsArgs := [1]reflect.Value{vRows}
vZeroRows := reflect.Zero(tRows)
stmt, err := snapshot.Preparex(query)
if err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query"))
}
defer stmt.Close()
args = append(append([]interface{}(nil), args...), checkpoint, bulk)
for {
if err := stmt.Select(rowsPtr, checkpoint, bulk); err != nil {
if err := snapshot.Select(rowsPtr, query, args...); err != nil {
log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query"))
}
@ -137,19 +196,23 @@ func sliceIdoHistory(snapshot *sqlx.Tx, query string, checkpoint, onRows interfa
}
vRows.Set(vZeroRows)
args[len(args)-2] = checkpoint
}
}
type historyType struct {
name string
idoTable string
idoIdColumn string
idoColumns []string
idbTable string
idbIdColumn string
convertId func(row ProgressRow, env string) []byte
cacheSchema []string
cacheFiller func(*historyType)
name string
idoTable string
idoIdColumn string
idoColumns []string
idbTable string
idbIdColumn string
convertId func(row ProgressRow, env string) []byte
cacheSchema []string
cacheFiller func(*historyType)
cacheLimitQuery string
migrationQuery string
convertRows interface{}
cache *sqlx.DB
snapshot *sqlx.Tx
@ -193,7 +256,7 @@ var types = historyTypes{
nil,
"history",
"id",
func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('a', row.Id) },
func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('a', row.Id); return u.UUID[:] },
eventTimeCacheSchema,
func(ht *historyType) {
buildEventTimeCache(ht, []string{
@ -201,6 +264,9 @@ var types = historyTypes{
"xh.entry_time_usec event_time_usec", "xh.acknowledgement_type event_is_start", "xh.object_id",
})
},
"",
acknowledgementMigrationQuery,
convertAcknowledgementRows,
nil, nil, 0, nil, 0,
},
{
@ -211,7 +277,10 @@ var types = historyTypes{
"comment_history",
"comment_id",
func(row ProgressRow, env string) []byte { return calcObjectId(env, row.Name) },
nil, nil, nil, nil, 0, nil, 0,
nil, nil, "",
commentMigrationQuery,
convertCommentRows,
nil, nil, 0, nil, 0,
},
{
"downtime",
@ -221,7 +290,10 @@ var types = historyTypes{
"downtime_history",
"downtime_id",
func(row ProgressRow, env string) []byte { return calcObjectId(env, row.Name) },
nil, nil, nil, nil, 0, nil, 0,
nil, nil, "",
downtimeMigrationQuery,
convertDowntimeRows,
nil, nil, 0, nil, 0,
},
{
"flapping",
@ -230,7 +302,7 @@ var types = historyTypes{
nil,
"history",
"id",
func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('f', row.Id) },
func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('f', row.Id); return u.UUID[:] },
eventTimeCacheSchema,
func(ht *historyType) {
buildEventTimeCache(ht, []string{
@ -238,6 +310,9 @@ var types = historyTypes{
"xh.event_time_usec", "xh.event_type-1000 event_is_start", "xh.object_id",
})
},
"",
flappingMigrationQuery,
convertFlappingRows,
nil, nil, 0, nil, 0,
},
{
@ -247,13 +322,16 @@ var types = historyTypes{
nil,
"notification_history",
"id",
func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('n', row.Id) },
func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('n', row.Id); return u.UUID[:] },
previousHardStateCacheSchema,
func(ht *historyType) {
buildPreviousHardStateCache(ht, []string{
"xh.notification_id id", "xh.object_id", "xh.state last_hard_state",
})
},
"SELECT MAX(history_id) FROM previous_hard_state",
notificationMigrationQuery,
convertNotificationRows,
nil, nil, 0, nil, 0,
},
{
@ -263,11 +341,14 @@ var types = historyTypes{
nil,
"state_history",
"id",
func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('s', row.Id) },
func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('s', row.Id); return u.UUID[:] },
previousHardStateCacheSchema,
func(ht *historyType) {
buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"})
},
"SELECT MAX(history_id) FROM previous_hard_state",
stateMigrationQuery,
convertStateRows,
nil, nil, 0, nil, 0,
},
}

View file

@ -6,10 +6,8 @@ import (
)
func TestMkDeterministicUuid(t *testing.T) {
if !bytes.Equal(
mkDeterministicUuid('s', 0x0102030405060708),
[]byte{'I', 'D', 'O', 's', 'h', 0, 0x40, 1, 0x80, 2, 3, 4, 5, 6, 7, 8},
) {
id := mkDeterministicUuid('s', 0x0102030405060708).UUID
if !bytes.Equal(id[:], []byte{'I', 'D', 'O', 's', 'h', 0, 0x40, 1, 0x80, 2, 3, 4, 5, 6, 7, 8}) {
t.Error("got wrong UUID from mkDeterministicUuid(stateHistory, 0x0102030405060708)")
}
}