mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-09 08:56:54 -04:00
cmd/ido2icingadb: remove unnecessary mutex
for the sake of speed.
This commit is contained in:
parent
8bf8a6fe75
commit
aa571f0856
2 changed files with 13 additions and 25 deletions
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/goccy/go-yaml"
|
||||
"github.com/icinga/icingadb/pkg/com"
|
||||
"github.com/icinga/icingadb/pkg/config"
|
||||
"github.com/icinga/icingadb/pkg/driver"
|
||||
"github.com/icinga/icingadb/pkg/icingadb"
|
||||
"github.com/icinga/icingadb/pkg/logging"
|
||||
icingadbTypes "github.com/icinga/icingadb/pkg/types"
|
||||
|
|
@ -25,7 +24,6 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -279,7 +277,6 @@ func fillCache() {
|
|||
// migrate does the actual migration.
|
||||
func migrate(c *Config, idb *icingadb.DB, envId []byte) {
|
||||
endpointId := sha1.Sum([]byte(c.Icinga2.Endpoint))
|
||||
idbTx := &sync.Mutex{}
|
||||
|
||||
progress := mpb.New()
|
||||
for _, ht := range types {
|
||||
|
|
@ -287,7 +284,7 @@ func migrate(c *Config, idb *icingadb.DB, envId []byte) {
|
|||
}
|
||||
|
||||
types.forEach(func(ht *historyType) {
|
||||
ht.migrate(c, idb, envId, endpointId, idbTx, ht)
|
||||
ht.migrate(c, idb, envId, endpointId, ht)
|
||||
})
|
||||
|
||||
progress.Wait()
|
||||
|
|
@ -295,7 +292,7 @@ func migrate(c *Config, idb *icingadb.DB, envId []byte) {
|
|||
|
||||
// migrate does the actual migration for one history type.
|
||||
func migrateOneType[IdoRow any](
|
||||
c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, idbTx *sync.Mutex, ht *historyType,
|
||||
c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, ht *historyType,
|
||||
convertRows func(env string, envId, endpointId icingadbTypes.Binary,
|
||||
selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx,
|
||||
idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]any, checkpoint any),
|
||||
|
|
@ -359,13 +356,6 @@ func migrateOneType[IdoRow any](
|
|||
|
||||
// ... and insert them:
|
||||
|
||||
if idb.DriverName() == driver.MySQL {
|
||||
// Avoid MySQL error 1205 (Lock wait timeout exceeded; try restarting transaction)
|
||||
// due to concurrent transactions upsert the same table (history).
|
||||
idbTx.Lock()
|
||||
defer idbTx.Unlock()
|
||||
}
|
||||
|
||||
tx, err := idb.Beginx()
|
||||
if err != nil {
|
||||
log.With("backend", "Icinga DB").Fatalf("%+v", errors.Wrap(err, "can't begin transaction"))
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -201,8 +200,7 @@ type historyType struct {
|
|||
// migrationQuery SELECTs source data for actual migration.
|
||||
migrationQuery string
|
||||
// migrate does the actual migration.
|
||||
migrate func(c *Config, idb *icingadb.DB, envId []byte,
|
||||
endpointId [sha1.Size]byte, idbTx *sync.Mutex, ht *historyType)
|
||||
migrate func(c *Config, idb *icingadb.DB, envId []byte, endpointId [sha1.Size]byte, ht *historyType)
|
||||
|
||||
// cacheFile locates <name>.sqlite3.
|
||||
cacheFile string
|
||||
|
|
@ -259,8 +257,8 @@ var types = historyTypes{
|
|||
idoTable: "icinga_commenthistory",
|
||||
idoIdColumn: "commenthistory_id",
|
||||
migrationQuery: commentMigrationQuery,
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, idbTx *sync.Mutex, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, idbTx, ht, convertCommentRows)
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, ht, convertCommentRows)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -268,8 +266,8 @@ var types = historyTypes{
|
|||
idoTable: "icinga_downtimehistory",
|
||||
idoIdColumn: "downtimehistory_id",
|
||||
migrationQuery: downtimeMigrationQuery,
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, idbTx *sync.Mutex, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, idbTx, ht, convertDowntimeRows)
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, ht, convertDowntimeRows)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -284,8 +282,8 @@ var types = historyTypes{
|
|||
})
|
||||
},
|
||||
migrationQuery: flappingMigrationQuery,
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, idbTx *sync.Mutex, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, idbTx, ht, convertFlappingRows)
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, ht, convertFlappingRows)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -300,8 +298,8 @@ var types = historyTypes{
|
|||
},
|
||||
cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
|
||||
migrationQuery: notificationMigrationQuery,
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, idbTx *sync.Mutex, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, idbTx, ht, convertNotificationRows)
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, ht, convertNotificationRows)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -314,8 +312,8 @@ var types = historyTypes{
|
|||
},
|
||||
cacheLimitQuery: "SELECT MAX(history_id) FROM previous_hard_state",
|
||||
migrationQuery: stateMigrationQuery,
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, idbTx *sync.Mutex, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, idbTx, ht, convertStateRows)
|
||||
migrate: func(c *Config, idb *icingadb.DB, envId []byte, endpId [20]byte, ht *historyType) {
|
||||
migrateOneType(c, idb, envId, endpId, ht, convertStateRows)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue