From 5396afe32afd5e6aff4fdb73286310665a3bcd29 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 8 Oct 2021 15:19:13 +0200 Subject: [PATCH] cmd/ido2icingadb: actually migrate --- cmd/ido2icingadb/cache.go | 20 +- cmd/ido2icingadb/convert.go | 861 ++++++++++++++++++++++++++++++++++ cmd/ido2icingadb/main.go | 162 ++++++- cmd/ido2icingadb/misc.go | 131 +++++- cmd/ido2icingadb/misc_test.go | 6 +- 5 files changed, 1137 insertions(+), 43 deletions(-) create mode 100644 cmd/ido2icingadb/convert.go diff --git a/cmd/ido2icingadb/cache.go b/cmd/ido2icingadb/cache.go index 3ecc1ea9..82cc4acf 100644 --- a/cmd/ido2icingadb/cache.go +++ b/cmd/ido2icingadb/cache.go @@ -13,8 +13,8 @@ var eventTimeCacheSchema = []string{ // Icinga DB's flapping_history#start_time per flapping_end row (IDO's icinga_flappinghistory#flappinghistory_id). `CREATE TABLE IF NOT EXISTS end_start_time ( history_id INT PRIMARY KEY, - event_time INT, - event_time_usec INT + event_time INT NOT NULL, + event_time_usec INT NOT NULL )`, // Helper table, the last start_time per icinga_statehistory#object_id. `CREATE TABLE IF NOT EXISTS last_start_time ( @@ -48,7 +48,7 @@ func buildEventTimeCache(ht *historyType, idoColumns []string) { "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+ " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ ht.idoIdColumn+" > ? ORDER BY xh."+ht.idoIdColumn+" LIMIT ?", - checkpoint.MaxId.Int64, + nil, checkpoint.MaxId.Int64, func(idoRows []struct { Id uint64 EventTime int64 @@ -77,12 +77,6 @@ func buildEventTimeCache(ht *historyType, idoColumns []string) { onDeleted(cacheExec( *tx, false, "DELETE FROM last_start_time WHERE object_id=?", idoRow.ObjectId, )) - } else { - cacheExec( - *tx, false, - "INSERT INTO end_start_time(history_id, event_time, event_time_usec) "+ - "VALUES (?, NULL, NULL)", idoRow.Id, - ) } } else { onDeleted(cacheExec( @@ -179,7 +173,7 @@ func buildPreviousHardStateCache(ht *historyType, idoColumns []string) { "SELECT "+strings.Join(idoColumns, ", ")+" FROM "+ht.idoTable+ " xh USE INDEX (PRIMARY) INNER JOIN icinga_objects o ON o.object_id=xh.object_id WHERE xh."+ ht.idoIdColumn+" < ? ORDER BY xh."+ht.idoIdColumn+" DESC LIMIT ?", - checkpoint, + nil, checkpoint, func(idoRows []struct { Id uint64 ObjectId uint64 @@ -293,8 +287,10 @@ func chunkCacheTx(cache *sqlx.DB, do func(tx **sqlx.Tx, onDeleted func(sql.Resul } } -func cacheGet(cacheTx *sqlx.Tx, dest interface{}, query string, args ...interface{}) { - if err := cacheTx.Get(dest, query, args...); err != nil { +func cacheGet(cache interface { + Get(dest interface{}, query string, args ...interface{}) error +}, dest interface{}, query string, args ...interface{}) { + if err := cache.Get(dest, query, args...); err != nil { log.With("backend", "cache", "query", query, "args", args). Errorf("%+v", errors.Wrap(err, "can't perform query")) } diff --git a/cmd/ido2icingadb/convert.go b/cmd/ido2icingadb/convert.go new file mode 100644 index 00000000..e7642060 --- /dev/null +++ b/cmd/ido2icingadb/convert.go @@ -0,0 +1,861 @@ +package main + +import ( + "database/sql" + "github.com/icinga/icingadb/pkg/contracts" + v1 "github.com/icinga/icingadb/pkg/icingadb/v1" + "github.com/icinga/icingadb/pkg/icingadb/v1/history" + icingadbTypes "github.com/icinga/icingadb/pkg/types" + "github.com/icinga/icingadb/pkg/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "strings" + "time" +) + +const acknowledgementMigrationQuery = "SELECT ah.acknowledgement_id, UNIX_TIMESTAMP(ah.entry_time) entry_time, " + + "ah.entry_time_usec, ah.acknowledgement_type, ah.author_name, ah.comment_data, ah.is_sticky, " + + "ah.persistent_comment, UNIX_TIMESTAMP(ah.end_time) end_time, o.objecttype_id, o.name1, " + + "IFNULL(o.name2, '') name2 " + + "FROM icinga_acknowledgements ah USE INDEX (PRIMARY) " + + "INNER JOIN icinga_objects o ON o.object_id=ah.object_id " + + "WHERE ah.acknowledgement_id > ? " + // where we were interrupted + "ORDER BY ah.acknowledgement_id " + // allows computeProgress() not to check all IDO rows for whether migrated + "LIMIT ?" + +// AckClear updates an already migrated ack event with the clear event info. +type AckClear struct { + Id icingadbTypes.Binary + ClearTime icingadbTypes.UnixMilli +} + +// Assert interface compliance. +var _ contracts.TableNamer = (*AckClear)(nil) + +// TableName implements the contracts.TableNamer interface. +func (*AckClear) TableName() string { + return "acknowledgement_history" +} + +type acknowledgementRow = struct { + AcknowledgementId uint64 + EntryTime int64 + EntryTimeUsec uint32 + AcknowledgementType uint8 + AuthorName sql.NullString + CommentData sql.NullString + IsSticky uint8 + PersistentComment uint8 + EndTime sql.NullInt64 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertAcknowledgementRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []acknowledgementRow, +) (icingaDbUpdates, icingaDbInserts [][]interface{}, checkpoint interface{}) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + EventTime int64 + EventTimeUsec uint32 + } + selectCache( + &cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?", + idoRows[0].AcknowledgementId, idoRows[len(idoRows)-1].AcknowledgementId, + ) + + // Needed for set time (see below). + cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec) + } + + var acknowledgementHistory, acknowledgementHistoryUpdates, allHistory []interface{} + for _, row := range idoRows { + ts := convertTime(row.EntryTime, row.EntryTimeUsec) + + // Needed for ID (see below). + var set icingadbTypes.UnixMilli + if row.AcknowledgementType == 0 { // clear + var ok bool + set, ok = cachedById[row.AcknowledgementId] + + if !ok { + continue + } + } else { + set = ts + } + + name := row.Name1 + if row.Name2 != "" { + name += "!" + row.Name2 + } + + id := mkDeterministicUuid('a', row.AcknowledgementId) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + acknowledgementHistoryId := hashAny([]interface{}{ + env, strings.Title(typ), name, float64(utils.UnixMilli(set.Time())), + }) + + if row.AcknowledgementType == 0 { // clear + // The set counterpart should already have been inserted. + acknowledgementHistoryUpdates = append(acknowledgementHistoryUpdates, &AckClear{ + acknowledgementHistoryId, ts, + }) + + h := &history.HistoryAck{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "ack_clear", + }, + AcknowledgementHistoryId: acknowledgementHistoryId, + SetTime: set, + ClearTime: ts, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } else { // set + acknowledgementHistory = append(acknowledgementHistory, &history.AcknowledgementHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: acknowledgementHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + SetTime: set, + Author: icingadbTypes.String{NullString: row.AuthorName}, + Comment: icingadbTypes.String{NullString: row.CommentData}, + ExpireTime: convertTime(row.EndTime.Int64, 0), + IsPersistent: icingadbTypes.Bool{Bool: row.PersistentComment != 0, Valid: true}, + IsSticky: icingadbTypes.Bool{Bool: row.IsSticky != 0, Valid: true}, + }) + + h := &history.HistoryAck{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "ack_set", + }, + AcknowledgementHistoryId: acknowledgementHistoryId, + SetTime: set, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } + + checkpoint = row.AcknowledgementId + } + + icingaDbUpdates = [][]interface{}{acknowledgementHistoryUpdates} + icingaDbInserts = [][]interface{}{acknowledgementHistory, allHistory} + return +} + +const commentMigrationQuery = "SELECT ch.commenthistory_id, UNIX_TIMESTAMP(ch.entry_time) entry_time, " + + "ch.entry_time_usec, ch.entry_type, ch.author_name, ch.comment_data, ch.is_persistent, " + + "IFNULL(UNIX_TIMESTAMP(ch.expiration_time), 0) expiration_time, " + + "IFNULL(UNIX_TIMESTAMP(ch.deletion_time), 0) deletion_time, ch.deletion_time_usec, ch.name, " + + "o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " + + "FROM icinga_commenthistory ch USE INDEX (PRIMARY) " + + "INNER JOIN icinga_objects o ON o.object_id=ch.object_id " + + "WHERE ch.commenthistory_id > ? " + // where we were interrupted + "ORDER BY ch.commenthistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated + "LIMIT ?" + +type commentRow = struct { + CommenthistoryId uint64 + EntryTime int64 + EntryTimeUsec uint32 + EntryType uint8 + AuthorName string + CommentData string + IsPersistent uint8 + ExpirationTime int64 + DeletionTime int64 + DeletionTimeUsec uint32 + Name string + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertCommentRows( + env string, envId, endpointId icingadbTypes.Binary, + _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow, +) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) { + var commentHistory, allHistory []interface{} + for _, row := range idoRows { + id := calcObjectId(env, row.Name) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + entryTime := convertTime(row.EntryTime, row.EntryTimeUsec) + removeTime := convertTime(row.DeletionTime, row.DeletionTimeUsec) + expireTime := convertTime(row.ExpirationTime, 0) + + commentHistory = append(commentHistory, &history.CommentHistory{ + CommentHistoryEntity: history.CommentHistoryEntity{CommentId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + CommentHistoryUpserter: history.CommentHistoryUpserter{ + RemoveTime: removeTime, + HasBeenRemoved: icingadbTypes.Bool{Bool: !removeTime.Time().IsZero(), Valid: true}, + }, + EntryTime: entryTime, + Author: row.AuthorName, + Comment: row.CommentData, + EntryType: icingadbTypes.CommentType(row.EntryType), + IsPersistent: icingadbTypes.Bool{Bool: row.IsPersistent != 0, Valid: true}, + IsSticky: icingadbTypes.Bool{Bool: false, Valid: true}, + ExpireTime: expireTime, + }) + + h1 := &history.HistoryComment{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: randomUuid()}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "comment_add", + }, + CommentHistoryId: id, + EntryTime: entryTime, + } + + h1.EventTime.History = h1 + allHistory = append(allHistory, h1) + + if !removeTime.Time().IsZero() { // remove + h2 := &history.HistoryComment{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: randomUuid()}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "comment_remove", + }, + CommentHistoryId: id, + EntryTime: entryTime, + RemoveTime: removeTime, + ExpireTime: expireTime, + } + + h2.EventTime.History = h2 + allHistory = append(allHistory, h2) + } + + checkpoint = row.CommenthistoryId + } + + icingaDbInserts = [][]interface{}{commentHistory, allHistory} + return +} + +const downtimeMigrationQuery = "SELECT dh.downtimehistory_id, UNIX_TIMESTAMP(dh.entry_time) entry_time, " + + "dh.author_name, dh.comment_data, dh.is_fixed, dh.duration, " + + "UNIX_TIMESTAMP(dh.scheduled_start_time) scheduled_start_time, " + + "IFNULL(UNIX_TIMESTAMP(dh.scheduled_end_time), 0) scheduled_end_time, " + + "IFNULL(UNIX_TIMESTAMP(dh.actual_start_time), 0) actual_start_time, dh.actual_start_time_usec, " + + "IFNULL(UNIX_TIMESTAMP(dh.actual_end_time), 0) actual_end_time, dh.actual_end_time_usec, dh.was_cancelled, " + + "IFNULL(UNIX_TIMESTAMP(dh.trigger_time), 0) trigger_time, dh.name, o.objecttype_id, o.name1, " + + "IFNULL(o.name2, '') name2, IFNULL(sd.name, '') triggered_by " + + "FROM icinga_downtimehistory dh USE INDEX (PRIMARY) " + + "INNER JOIN icinga_objects o ON o.object_id=dh.object_id " + + "LEFT JOIN icinga_scheduleddowntime sd ON sd.scheduleddowntime_id=dh.triggered_by_id " + + "WHERE dh.downtimehistory_id > ? " + // where we were interrupted + "ORDER BY dh.downtimehistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated + "LIMIT ?" + +type downtimeRow = struct { + DowntimehistoryId uint64 + EntryTime int64 + AuthorName string + CommentData string + IsFixed uint8 + Duration int64 + ScheduledStartTime int64 + ScheduledEndTime int64 + ActualStartTime int64 + ActualStartTimeUsec uint32 + ActualEndTime int64 + ActualEndTimeUsec uint32 + WasCancelled uint8 + TriggerTime int64 + Name string + ObjecttypeId uint8 + Name1 string + Name2 string + TriggeredBy string +} + +func convertDowntimeRows( + env string, envId, endpointId icingadbTypes.Binary, + _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow, +) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) { + var downtimeHistory, allHistory []interface{} + for _, row := range idoRows { + id := calcObjectId(env, row.Name) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + scheduledStart := convertTime(row.ScheduledStartTime, 0) + scheduledEnd := convertTime(row.ScheduledEndTime, 0) + triggerTime := convertTime(row.TriggerTime, 0) + actualStart := convertTime(row.ActualStartTime, row.ActualStartTimeUsec) + actualEnd := convertTime(row.ActualEndTime, row.ActualEndTimeUsec) + var startTime, endTime, cancelTime icingadbTypes.UnixMilli + + if scheduledEnd.Time().IsZero() { + scheduledEnd = icingadbTypes.UnixMilli(scheduledStart.Time().Add(time.Duration(row.Duration) * time.Second)) + } + + if actualStart.Time().IsZero() { + startTime = scheduledStart + } else { + startTime = actualStart + } + + if actualEnd.Time().IsZero() { + endTime = scheduledEnd + } else { + endTime = actualEnd + } + + if triggerTime.Time().IsZero() { + triggerTime = startTime + } + + if row.WasCancelled != 0 { + cancelTime = actualEnd + } + + downtimeHistory = append(downtimeHistory, &history.DowntimeHistory{ + DowntimeHistoryEntity: history.DowntimeHistoryEntity{DowntimeId: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + DowntimeHistoryUpserter: history.DowntimeHistoryUpserter{ + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + CancelTime: cancelTime, + }, + TriggeredById: calcObjectId(env, row.TriggeredBy), + EntryTime: convertTime(row.EntryTime, 0), + Author: row.AuthorName, + Comment: row.CommentData, + IsFlexible: icingadbTypes.Bool{Bool: row.IsFixed == 0, Valid: true}, + FlexibleDuration: uint64(row.Duration) * 1000, + ScheduledStartTime: scheduledStart, + ScheduledEndTime: scheduledEnd, + StartTime: startTime, + EndTime: endTime, + TriggerTime: triggerTime, + }) + + h1 := &history.HistoryDowntime{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: randomUuid()}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "downtime_start", + }, + DowntimeHistoryId: id, + StartTime: startTime, + } + + h1.EventTime.History = h1 + allHistory = append(allHistory, h1) + + if !actualEnd.Time().IsZero() { // remove + h2 := &history.HistoryDowntime{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: randomUuid()}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "downtime_end", + }, + DowntimeHistoryId: id, + StartTime: startTime, + CancelTime: cancelTime, + EndTime: endTime, + HasBeenCancelled: icingadbTypes.Bool{Bool: row.WasCancelled != 0, Valid: true}, + } + + h2.EventTime.History = h2 + allHistory = append(allHistory, h2) + } + + checkpoint = row.DowntimehistoryId + } + + icingaDbInserts = [][]interface{}{downtimeHistory, allHistory} + return +} + +const flappingMigrationQuery = "SELECT fh.flappinghistory_id, UNIX_TIMESTAMP(fh.event_time) event_time, " + + "fh.event_time_usec, fh.event_type, fh.percent_state_change, fh.low_threshold, " + + "fh.high_threshold, o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " + + "FROM icinga_flappinghistory fh USE INDEX (PRIMARY) " + + "INNER JOIN icinga_objects o ON o.object_id=fh.object_id " + + "WHERE fh.flappinghistory_id > ? " + // where we were interrupted + "ORDER BY fh.flappinghistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated + "LIMIT ?" + +// FlappingEnd updates an already migrated start event with the end event info. +type FlappingEnd struct { + Id icingadbTypes.Binary + EndTime icingadbTypes.UnixMilli +} + +// Assert interface compliance. +var _ contracts.TableNamer = (*FlappingEnd)(nil) + +// TableName implements the contracts.TableNamer interface. +func (*FlappingEnd) TableName() string { + return "flapping_history" +} + +type flappingRow = struct { + FlappinghistoryId uint64 + EventTime int64 + EventTimeUsec uint32 + EventType uint16 + PercentStateChange sql.NullFloat64 + LowThreshold float64 + HighThreshold float64 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertFlappingRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow, +) (icingaDbUpdates, icingaDbInserts [][]interface{}, checkpoint interface{}) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + EventTime int64 + EventTimeUsec uint32 + } + selectCache( + &cached, "SELECT history_id, event_time, event_time_usec FROM end_start_time WHERE history_id BETWEEN ? AND ?", + idoRows[0].FlappinghistoryId, idoRows[len(idoRows)-1].FlappinghistoryId, + ) + + // Needed for start time (see below). + cachedById := make(map[uint64]icingadbTypes.UnixMilli, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = convertTime(c.EventTime, c.EventTimeUsec) + } + + var flappingHistory, flappingHistoryUpdates, allHistory []interface{} + for _, row := range idoRows { + ts := convertTime(row.EventTime, row.EventTimeUsec) + + // Needed for ID (see below). + var start icingadbTypes.UnixMilli + if row.EventType == 1001 { // end + var ok bool + start, ok = cachedById[row.FlappinghistoryId] + + if !ok { + continue + } + } else { + start = ts + } + + name := row.Name1 + if row.Name2 != "" { + name += "!" + row.Name2 + } + + id := mkDeterministicUuid('f', row.FlappinghistoryId) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + + flappingHistoryId := hashAny([]interface{}{ + env, strings.Title(typ), name, float64(utils.UnixMilli(start.Time())), + }) + + if row.EventType == 1001 { // end + // The start counterpart should already have been inserted. + flappingHistoryUpdates = append(flappingHistoryUpdates, &FlappingEnd{flappingHistoryId, ts}) + + h := &history.HistoryFlapping{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "flapping_end", + }, + FlappingHistoryId: flappingHistoryId, + StartTime: start, + EndTime: ts, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } else { // end + flappingHistory = append(flappingHistory, &history.FlappingHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: flappingHistoryId}, + }, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + FlappingHistoryUpserter: history.FlappingHistoryUpserter{ + FlappingThresholdLow: float32(row.LowThreshold), + FlappingThresholdHigh: float32(row.HighThreshold), + }, + StartTime: start, + PercentStateChangeStart: icingadbTypes.Float{NullFloat64: row.PercentStateChange}, + }) + + h := &history.HistoryFlapping{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "flapping_start", + }, + FlappingHistoryId: flappingHistoryId, + StartTime: start, + } + + h.EventTime.History = h + allHistory = append(allHistory, h) + } + + checkpoint = row.FlappinghistoryId + } + + icingaDbUpdates = [][]interface{}{flappingHistoryUpdates} + icingaDbInserts = [][]interface{}{flappingHistory, allHistory} + return +} + +const notificationMigrationQuery = "SELECT n.notification_id, n.notification_reason, " + + "UNIX_TIMESTAMP(n.end_time) end_time, n.end_time_usec, n.state, IFNULL(n.output, '') output, " + + "n.long_output, n.contacts_notified, o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " + + "FROM icinga_notifications n USE INDEX (PRIMARY) " + + "INNER JOIN icinga_objects o ON o.object_id=n.object_id " + + "WHERE n.notification_id <= ? AND " + + "n.notification_id > ? " + // where we were interrupted + "ORDER BY n.notification_id " + // allows computeProgress() not to check all IDO rows for whether migrated + "LIMIT ?" + +// zeroHash is a NULL alternative for NOT NULL columns. +var zeroHash = make(icingadbTypes.Binary, 20) + +// notificationTypes maps IDO values to Icinga DB ones. +var notificationTypes = map[uint8]icingadbTypes.NotificationType{5: 1, 6: 2, 7: 4, 8: 8, 1: 16, 2: 128, 3: 256} + +type notificationRow = struct { + NotificationId uint64 + NotificationReason uint8 + EndTime int64 + EndTimeUsec uint32 + State uint8 + Output string + LongOutput sql.NullString + ContactsNotified uint16 + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertNotificationRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow, +) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + PreviousHardState uint8 + } + selectCache( + &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?", + idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId, + ) + + cachedById := make(map[uint64]uint8, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = c.PreviousHardState + } + + var contacts []struct { + NotificationId uint64 + Name1 string + } + + { + const query = "SELECT c.notification_id, o.name1 FROM icinga_contactnotifications c " + + "INNER JOIN icinga_objects o ON o.object_id=c.contact_object_id WHERE c.notification_id BETWEEN ? AND ?" + + err := ido.Select(&contacts, query, idoRows[0].NotificationId, idoRows[len(idoRows)-1].NotificationId) + if err != nil { + log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + } + + contactsById := map[uint64]map[string]struct{}{} + for _, contact := range contacts { + perId, ok := contactsById[contact.NotificationId] + if !ok { + perId = map[string]struct{}{} + contactsById[contact.NotificationId] = perId + } + + perId[contact.Name1] = struct{}{} + } + + var notificationHistory, userNotificationHistory, allHistory []interface{} + for _, row := range idoRows { + previousHardState, ok := cachedById[row.NotificationId] + if !ok { + continue + } + + id := mkDeterministicUuid('n', row.NotificationId) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + ts := convertTime(row.EndTime, row.EndTimeUsec) + + var nt icingadbTypes.NotificationType + if row.NotificationReason == 0 { + if row.State == 0 { + nt = 64 // recovery + } else { + nt = 32 // problem + } + } else { + nt = notificationTypes[row.NotificationReason] + } + + text := row.Output + if row.LongOutput.Valid { + text += "\n\n" + row.LongOutput.String + } + + notificationHistory = append(notificationHistory, &history.NotificationHistory{ + HistoryTableEntity: history.HistoryTableEntity{Id: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + NotificationId: zeroHash, + Type: nt, + SendTime: ts, + State: row.State, + PreviousHardState: previousHardState, + Author: "-", + Text: text, + UsersNotified: row.ContactsNotified, + }) + + allHistory = append(allHistory, &history.HistoryNotification{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "notification", + }, + NotificationHistoryId: id, + EventTime: ts, + }) + + for contact := range contactsById[row.NotificationId] { + userId := calcObjectId(env, contact) + + userNotificationHistory = append(userNotificationHistory, &history.UserNotificationHistory{ + EntityWithoutChecksum: v1.EntityWithoutChecksum{ + IdMeta: v1.IdMeta{Id: utils.Checksum(append(append([]byte(nil), id.UUID[:]...), userId...))}, + }, + EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: envId}, + NotificationHistoryId: id, + UserId: userId, + }) + } + + checkpoint = row.NotificationId + } + + icingaDbInserts = [][]interface{}{notificationHistory, userNotificationHistory, allHistory} + return +} + +const stateMigrationQuery = "SELECT sh.statehistory_id, UNIX_TIMESTAMP(sh.state_time) state_time, " + + "sh.state_time_usec, sh.state, sh.state_type, sh.current_check_attempt, " + + "sh.max_check_attempts, sh.last_state, sh.last_hard_state, sh.output, sh.long_output, " + + "sh.check_source, o.objecttype_id, o.name1, IFNULL(o.name2, '') name2 " + + "FROM icinga_statehistory sh USE INDEX (PRIMARY) " + + "INNER JOIN icinga_objects o ON o.object_id=sh.object_id " + + "WHERE sh.statehistory_id <= ? AND " + + "sh.statehistory_id > ? " + // where we were interrupted + "ORDER BY sh.statehistory_id " + // allows computeProgress() not to check all IDO rows for whether migrated + "LIMIT ?" + +type stateRow = struct { + StatehistoryId uint64 + StateTime int64 + StateTimeUsec uint32 + State uint8 + StateType uint8 + CurrentCheckAttempt uint16 + MaxCheckAttempts uint16 + LastState uint8 + LastHardState uint8 + Output sql.NullString + LongOutput sql.NullString + CheckSource sql.NullString + ObjecttypeId uint8 + Name1 string + Name2 string +} + +func convertStateRows( + env string, envId, endpointId icingadbTypes.Binary, + selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow, +) (_, icingaDbInserts [][]interface{}, checkpoint interface{}) { + if len(idoRows) < 1 { + return + } + + var cached []struct { + HistoryId uint64 + PreviousHardState uint8 + } + selectCache( + &cached, "SELECT history_id, previous_hard_state FROM previous_hard_state WHERE history_id BETWEEN ? AND ?", + idoRows[0].StatehistoryId, idoRows[len(idoRows)-1].StatehistoryId, + ) + + cachedById := make(map[uint64]uint8, len(cached)) + for _, c := range cached { + cachedById[c.HistoryId] = c.PreviousHardState + } + + var stateHistory, allHistory []interface{} + for _, row := range idoRows { + previousHardState, ok := cachedById[row.StatehistoryId] + if !ok { + continue + } + + id := mkDeterministicUuid('s', row.StatehistoryId) + typ := objectTypes[row.ObjecttypeId] + hostId := calcObjectId(env, row.Name1) + serviceId := calcServiceId(env, row.Name1, row.Name2) + ts := convertTime(row.StateTime, row.StateTimeUsec) + + stateHistory = append(stateHistory, &history.StateHistory{ + HistoryTableEntity: history.HistoryTableEntity{Id: id}, + HistoryTableMeta: history.HistoryTableMeta{ + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + }, + EventTime: ts, + StateType: icingadbTypes.StateType(row.StateType), + SoftState: row.State, + HardState: row.LastHardState, + PreviousSoftState: row.LastState, + PreviousHardState: previousHardState, + Attempt: uint8(row.CurrentCheckAttempt), + Output: icingadbTypes.String{NullString: row.Output}, + LongOutput: icingadbTypes.String{NullString: row.LongOutput}, + MaxCheckAttempts: uint32(row.MaxCheckAttempts), + CheckSource: icingadbTypes.String{NullString: row.CheckSource}, + }) + + allHistory = append(allHistory, &history.HistoryState{ + HistoryMeta: history.HistoryMeta{ + HistoryEntity: history.HistoryEntity{Id: id}, + EnvironmentId: envId, + EndpointId: endpointId, + ObjectType: typ, + HostId: hostId, + ServiceId: serviceId, + EventType: "state_change", + }, + StateHistoryId: id, + EventTime: ts, + }) + + checkpoint = row.StatehistoryId + } + + icingaDbInserts = [][]interface{}{stateHistory, allHistory} + return +} diff --git a/cmd/ido2icingadb/main.go b/cmd/ido2icingadb/main.go index e7a03604..cb33a6af 100644 --- a/cmd/ido2icingadb/main.go +++ b/cmd/ido2icingadb/main.go @@ -3,12 +3,14 @@ package main import ( "bytes" "context" + "crypto/sha1" "database/sql" "fmt" "github.com/goccy/go-yaml" "github.com/icinga/icingadb/cmd/internal" "github.com/icinga/icingadb/pkg/config" "github.com/icinga/icingadb/pkg/icingadb" + icingadbTypes "github.com/icinga/icingadb/pkg/types" "github.com/jessevdk/go-flags" "github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx/reflectx" @@ -18,6 +20,7 @@ import ( "golang.org/x/sync/errgroup" "os" "path" + "reflect" "strings" "time" ) @@ -35,7 +38,8 @@ type Config struct { IDO config.Database `yaml:"ido"` IcingaDB config.Database `yaml:"icingadb"` Icinga2 struct { - Env string `yaml:"env"` + Env string `yaml:"env"` + Endpoint string `yaml:"endpoint"` } `yaml:"icinga2"` } @@ -71,6 +75,9 @@ func run() int { log.Info("Filling cache") fillCache() + log.Info("Actually migrating") + migrate(c, idb) + return internal.ExitSuccess } @@ -206,7 +213,7 @@ func computeProgress(c *Config, idb *icingadb.DB) { } }() - sliceIdoHistory(ht.snapshot, query, 0, func(rows []ProgressRow) (checkpoint interface{}) { + sliceIdoHistory(ht.snapshot, query, nil, 0, func(rows []ProgressRow) (checkpoint interface{}) { if len(rows) != lastRowsLen { if lastStmt != nil { lastStmt.Close() @@ -289,3 +296,154 @@ func fillCache() { progress.Wait() } + +var tAny = reflect.TypeOf((*interface{})(nil)). // *interface{} + Elem() // interface{} + +func migrate(c *Config, idb *icingadb.DB) { + envId := sha1.Sum([]byte(c.Icinga2.Env)) + endpointId := sha1.Sum([]byte(c.Icinga2.Endpoint)) + + progress := mpb.New() + for i := range types { + types[i].setupBar(progress) + } + + types.forEach(func(ht *historyType) { + vConvertRows := reflect.ValueOf(ht.convertRows) // TODO: make historyType#convertRows generic[T] one nice day + + tRows := vConvertRows.Type(). // func(env string, envId, endpointId Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []T) (icingaDbUpdates, icingaDbInserts [][]interface{}, checkpoint interface{}) + In(5) // []T + + var lastQuery string + var lastStmt *sqlx.Stmt + + defer func() { + if lastStmt != nil { + _ = lastStmt.Close() + } + }() + + vConvertRowsArgs := [6]reflect.Value{ + reflect.ValueOf(c.Icinga2.Env), reflect.ValueOf(icingadbTypes.Binary(envId[:])), + reflect.ValueOf(icingadbTypes.Binary(endpointId[:])), + reflect.ValueOf(func(dest interface{}, query string, args ...interface{}) { + if query != lastQuery { + if lastStmt != nil { + _ = lastStmt.Close() + } + + var err error + + lastStmt, err = ht.cache.Preparex(query) + if err != nil { + log.With("backend", "cache", "query", query). + Fatalf("%+v", errors.Wrap(err, "can't prepare query")) + } + + lastQuery = query + } + + if err := lastStmt.Select(dest, args...); err != nil { + log.With("backend", "cache", "query", query, "args", args). + Fatalf("%+v", errors.Wrap(err, "can't perform query")) + } + }), + reflect.ValueOf(ht.snapshot), + } + + var args []interface{} + if ht.cacheLimitQuery != "" { + var limit uint64 + cacheGet(ht.cache, &limit, ht.cacheLimitQuery) + args = append(args, limit) + } + + icingaDbInserts := map[reflect.Type]string{} + icingaDbUpdates := map[reflect.Type]string{} + inc := barIncrementer{ht.bar, time.Now()} + + sliceIdoHistory( + ht.snapshot, ht.migrationQuery, args, ht.lastId, + reflect.MakeFunc(reflect.FuncOf([]reflect.Type{tRows}, []reflect.Type{tAny}, false), + func(args []reflect.Value) []reflect.Value { + vConvertRowsArgs[5] = args[0] + res := vConvertRows.Call(vConvertRowsArgs[:]) + + tx, err := idb.Beginx() + if err != nil { + log.With("backend", "Icinga DB").Fatalf("%+v", errors.Wrap(err, "can't begin transaction")) + } + + for _, table := range res[0].Interface().([][]interface{}) { + if len(table) < 1 { + continue + } + + tRow := reflect.TypeOf(table[0]) + + update, ok := icingaDbUpdates[tRow] + if !ok { + update, _ = idb.BuildUpdateStmt(table[0]) + icingaDbUpdates[tRow] = update + } + + stmt, err := tx.PrepareNamed(update) + if err != nil { + log.With("backend", "Icinga DB", "dml", update). + Fatalf("%+v", errors.Wrap(err, "can't prepare DML")) + } + + for _, row := range table { + if _, err := stmt.Exec(row); err != nil { + log.With("backend", "Icinga DB", "dml", update, "args", row). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + _ = stmt.Close() + } + + for _, table := range res[1].Interface().([][]interface{}) { + if len(table) < 1 { + continue + } + + tRow := reflect.TypeOf(table[0]) + + insert, ok := icingaDbInserts[tRow] + if !ok { + insert, _ = idb.BuildInsertStmt(table[0]) + insert = "REPLACE " + strings.TrimPrefix(insert, "INSERT ") + icingaDbInserts[tRow] = insert + } + + for len(table) > 0 { + slice := table + if len(slice) > 1000 { + slice = slice[:1000] + table = table[1000:] + } else { + table = nil + } + + if _, err := tx.NamedExec(insert, slice); err != nil { + log.With("backend", "Icinga DB", "dml", insert, "args", slice). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + } + + if err := tx.Commit(); err != nil { + log.With("backend", "Icinga DB").Fatalf("%+v", errors.Wrap(err, "can't commit transaction")) + } + + inc.inc(args[0].Len()) + return res[2:] + }, + ).Interface(), + ) + }) + + progress.Wait() +} diff --git a/cmd/ido2icingadb/misc.go b/cmd/ido2icingadb/misc.go index d6600564..6ef26142 100644 --- a/cmd/ido2icingadb/misc.go +++ b/cmd/ido2icingadb/misc.go @@ -1,12 +1,15 @@ package main import ( + "bufio" "bytes" "context" + "crypto/rand" "crypto/sha1" "encoding/binary" "github.com/google/uuid" "github.com/icinga/icingadb/pkg/icingadb/objectpacker" + icingadbTypes "github.com/icinga/icingadb/pkg/types" "github.com/jmoiron/sqlx" "github.com/pkg/errors" "github.com/vbauerster/mpb/v6" @@ -14,6 +17,7 @@ import ( "go.uber.org/zap" "golang.org/x/sync/errgroup" "reflect" + "sync" "time" ) @@ -52,6 +56,8 @@ var log = func() *zap.SugaredLogger { return logger.Sugar() }() +var objectTypes = map[uint8]string{1: "host", 2: "service"} + // mkDeterministicUuid returns a formally random UUID (v4) as follows: 11111122-3300-4455-4455-555555555555 // // 0: zeroed @@ -60,7 +66,7 @@ var log = func() *zap.SugaredLogger { // 3: "h" (for "history") // 4: the new UUID's formal version (unused bits zeroed) // 5: the ID of the row the new UUID is for in the IDO (big endian) -func mkDeterministicUuid(table byte, rowId uint64) []byte { +func mkDeterministicUuid(table byte, rowId uint64) icingadbTypes.UUID { uid := uuidTemplate uid[3] = table @@ -73,7 +79,7 @@ func mkDeterministicUuid(table byte, rowId uint64) []byte { uid[7] = bEId[0] copy(uid[9:], bEId[1:]) - return uid[:] + return icingadbTypes.UUID{UUID: uid} } // uuidTemplate is for mkDeterministicUuid. @@ -90,6 +96,42 @@ var uuidTemplate = func() uuid.UUID { return uid }() +// randomUuid generates a new UUIDv4. +func randomUuid() icingadbTypes.UUID { + var rander *bufio.Reader + + massRanders.Lock() + for r := range massRanders.pool { + rander = r + delete(massRanders.pool, r) + break + } + massRanders.Unlock() + + if rander == nil { + rander = bufio.NewReader(rand.Reader) + } + + id, err := uuid.NewRandomFromReader(rander) + if err != nil { + log.Fatalf("%+v", errors.Wrap(err, "can't generate random UUID")) + } + + massRanders.Lock() + massRanders.pool[rander] = struct{}{} + massRanders.Unlock() + + return icingadbTypes.UUID{UUID: id} +} + +var massRanders = struct { + sync.Mutex + pool map[*bufio.Reader]struct{} +}{ + sync.Mutex{}, + map[*bufio.Reader]struct{}{}, +} + // hashAny combines PackAny and SHA1 hashing. func hashAny(in interface{}) []byte { hash := sha1.New() @@ -100,12 +142,34 @@ func hashAny(in interface{}) []byte { return hash.Sum(nil) } +// convertTime converts *nix timestamps from the IDO for Icinga DB. +func convertTime(ts int64, tsUs uint32) icingadbTypes.UnixMilli { + if ts == 0 && tsUs == 0 { + return icingadbTypes.UnixMilli{} + } + + return icingadbTypes.UnixMilli(time.Unix(ts, int64(tsUs)*int64(time.Microsecond/time.Nanosecond))) +} + // calcObjectId calculates the ID of the config object named name1 for Icinga DB. func calcObjectId(env, name1 string) []byte { + if name1 == "" { + return nil + } + return hashAny([2]string{env, name1}) } -func sliceIdoHistory(snapshot *sqlx.Tx, query string, checkpoint, onRows interface{}) { +// calcServiceId calculates the ID of the service name2 of the host name1 for Icinga DB. +func calcServiceId(env, name1, name2 string) []byte { + if name2 == "" { + return nil + } + + return hashAny([2]string{env, name1 + "!" + name2}) +} + +func sliceIdoHistory(snapshot *sqlx.Tx, query string, args []interface{}, checkpoint, onRows interface{}) { vOnRows := reflect.ValueOf(onRows) // TODO: make onRows generic[T] one nice day tRows := vOnRows.Type(). // func(rows []T) (checkpoint interface{}) @@ -116,15 +180,10 @@ func sliceIdoHistory(snapshot *sqlx.Tx, query string, checkpoint, onRows interfa vRows := vNewRows.Elem() onRowsArgs := [1]reflect.Value{vRows} vZeroRows := reflect.Zero(tRows) - - stmt, err := snapshot.Preparex(query) - if err != nil { - log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't prepare query")) - } - defer stmt.Close() + args = append(append([]interface{}(nil), args...), checkpoint, bulk) for { - if err := stmt.Select(rowsPtr, checkpoint, bulk); err != nil { + if err := snapshot.Select(rowsPtr, query, args...); err != nil { log.With("query", query).Fatalf("%+v", errors.Wrap(err, "can't perform query")) } @@ -137,19 +196,23 @@ func sliceIdoHistory(snapshot *sqlx.Tx, query string, checkpoint, onRows interfa } vRows.Set(vZeroRows) + args[len(args)-2] = checkpoint } } type historyType struct { - name string - idoTable string - idoIdColumn string - idoColumns []string - idbTable string - idbIdColumn string - convertId func(row ProgressRow, env string) []byte - cacheSchema []string - cacheFiller func(*historyType) + name string + idoTable string + idoIdColumn string + idoColumns []string + idbTable string + idbIdColumn string + convertId func(row ProgressRow, env string) []byte + cacheSchema []string + cacheFiller func(*historyType) + cacheLimitQuery string + migrationQuery string + convertRows interface{} cache *sqlx.DB snapshot *sqlx.Tx @@ -193,7 +256,7 @@ var types = historyTypes{ nil, "history", "id", - func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('a', row.Id) }, + func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('a', row.Id); return u.UUID[:] }, eventTimeCacheSchema, func(ht *historyType) { buildEventTimeCache(ht, []string{ @@ -201,6 +264,9 @@ var types = historyTypes{ "xh.entry_time_usec event_time_usec", "xh.acknowledgement_type event_is_start", "xh.object_id", }) }, + "", + acknowledgementMigrationQuery, + convertAcknowledgementRows, nil, nil, 0, nil, 0, }, { @@ -211,7 +277,10 @@ var types = historyTypes{ "comment_history", "comment_id", func(row ProgressRow, env string) []byte { return calcObjectId(env, row.Name) }, - nil, nil, nil, nil, 0, nil, 0, + nil, nil, "", + commentMigrationQuery, + convertCommentRows, + nil, nil, 0, nil, 0, }, { "downtime", @@ -221,7 +290,10 @@ var types = historyTypes{ "downtime_history", "downtime_id", func(row ProgressRow, env string) []byte { return calcObjectId(env, row.Name) }, - nil, nil, nil, nil, 0, nil, 0, + nil, nil, "", + downtimeMigrationQuery, + convertDowntimeRows, + nil, nil, 0, nil, 0, }, { "flapping", @@ -230,7 +302,7 @@ var types = historyTypes{ nil, "history", "id", - func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('f', row.Id) }, + func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('f', row.Id); return u.UUID[:] }, eventTimeCacheSchema, func(ht *historyType) { buildEventTimeCache(ht, []string{ @@ -238,6 +310,9 @@ var types = historyTypes{ "xh.event_time_usec", "xh.event_type-1000 event_is_start", "xh.object_id", }) }, + "", + flappingMigrationQuery, + convertFlappingRows, nil, nil, 0, nil, 0, }, { @@ -247,13 +322,16 @@ var types = historyTypes{ nil, "notification_history", "id", - func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('n', row.Id) }, + func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('n', row.Id); return u.UUID[:] }, previousHardStateCacheSchema, func(ht *historyType) { buildPreviousHardStateCache(ht, []string{ "xh.notification_id id", "xh.object_id", "xh.state last_hard_state", }) }, + "SELECT MAX(history_id) FROM previous_hard_state", + notificationMigrationQuery, + convertNotificationRows, nil, nil, 0, nil, 0, }, { @@ -263,11 +341,14 @@ var types = historyTypes{ nil, "state_history", "id", - func(row ProgressRow, _ string) []byte { return mkDeterministicUuid('s', row.Id) }, + func(row ProgressRow, _ string) []byte { u := mkDeterministicUuid('s', row.Id); return u.UUID[:] }, previousHardStateCacheSchema, func(ht *historyType) { buildPreviousHardStateCache(ht, []string{"xh.statehistory_id id", "xh.object_id", "xh.last_hard_state"}) }, + "SELECT MAX(history_id) FROM previous_hard_state", + stateMigrationQuery, + convertStateRows, nil, nil, 0, nil, 0, }, } diff --git a/cmd/ido2icingadb/misc_test.go b/cmd/ido2icingadb/misc_test.go index 76c5aec4..e3266e88 100644 --- a/cmd/ido2icingadb/misc_test.go +++ b/cmd/ido2icingadb/misc_test.go @@ -6,10 +6,8 @@ import ( ) func TestMkDeterministicUuid(t *testing.T) { - if !bytes.Equal( - mkDeterministicUuid('s', 0x0102030405060708), - []byte{'I', 'D', 'O', 's', 'h', 0, 0x40, 1, 0x80, 2, 3, 4, 5, 6, 7, 8}, - ) { + id := mkDeterministicUuid('s', 0x0102030405060708).UUID + if !bytes.Equal(id[:], []byte{'I', 'D', 'O', 's', 'h', 0, 0x40, 1, 0x80, 2, 3, 4, 5, 6, 7, 8}) { t.Error("got wrong UUID from mkDeterministicUuid(stateHistory, 0x0102030405060708)") } }