mirror of
https://github.com/Icinga/icingadb.git
synced 2026-06-08 16:34:29 -04:00
parent
908bb42004
commit
5b87fd94ee
2 changed files with 4 additions and 21 deletions
|
|
@ -19,8 +19,6 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
|
@ -104,7 +102,7 @@ func (db *DB) BuildColumns(subject interface{}) []string {
|
|||
|
||||
// BuildDeleteStmt returns a DELETE statement for the given struct.
|
||||
func (db *DB) BuildDeleteStmt(from interface{}) string {
|
||||
return db.PostProcessPlaceholders(fmt.Sprintf(
|
||||
return db.Rebind(fmt.Sprintf(
|
||||
`DELETE FROM "%s" WHERE id IN (?)`,
|
||||
utils.TableName(from),
|
||||
))
|
||||
|
|
@ -572,21 +570,6 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
|
|||
}
|
||||
}
|
||||
|
||||
var placeholder = regexp.MustCompile(`\?`)
|
||||
|
||||
// PostProcessPlaceholders returns query with placeholders (?) suitable for db.
|
||||
func (db *DB) PostProcessPlaceholders(query string) string {
|
||||
if db.DriverName() == "icingadb-pgsql" {
|
||||
var i uint64
|
||||
return placeholder.ReplaceAllStringFunc(query, func(string) string {
|
||||
i++
|
||||
return "$" + strconv.FormatUint(i, 10)
|
||||
})
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper {
|
||||
return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) {
|
||||
if count := counter.Reset(); count > 0 {
|
||||
|
|
|
|||
|
|
@ -236,7 +236,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type
|
|||
return errors.Wrap(errBegin, "can't start transaction")
|
||||
}
|
||||
|
||||
query := h.db.PostProcessPlaceholders("SELECT id, heartbeat FROM icingadb_instance " +
|
||||
query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance " +
|
||||
"WHERE environment_id = ? AND responsible = ? AND id <> ? AND heartbeat > ?")
|
||||
|
||||
instance := &v1.IcingadbInstance{}
|
||||
|
|
@ -339,7 +339,7 @@ func (h *HA) insertEnvironment() error {
|
|||
func (h *HA) removeInstance(ctx context.Context) {
|
||||
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
|
||||
// Intentionally not using h.ctx here as it's already cancelled.
|
||||
query := h.db.PostProcessPlaceholders("DELETE FROM icingadb_instance WHERE id = ?")
|
||||
query := h.db.Rebind("DELETE FROM icingadb_instance WHERE id = ?")
|
||||
_, err := h.db.ExecContext(ctx, query, h.instanceId)
|
||||
if err != nil {
|
||||
h.logger.Warnw("Could not remove instance from database", zap.Error(err), zap.String("query", query))
|
||||
|
|
@ -351,7 +351,7 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar
|
|||
case <-h.ctx.Done():
|
||||
return
|
||||
case <-time.After(timeout):
|
||||
query := h.db.PostProcessPlaceholders("DELETE FROM icingadb_instance " +
|
||||
query := h.db.Rebind("DELETE FROM icingadb_instance " +
|
||||
"WHERE id <> ? AND environment_id = ? AND endpoint_id = ? AND heartbeat < ?")
|
||||
heartbeat := types.UnixMilli(time.Now().Add(-timeout))
|
||||
result, err := h.db.ExecContext(h.ctx, query, h.instanceId, envId,
|
||||
|
|
|
|||
Loading…
Reference in a new issue