discovery/consul: Fix filter parameter not applied to health endpoint

The filter parameter was only being passed to catalog.Services()
but not to health.ServiceMultipleTags(), causing filters on Node
and Node.Meta to be ignored when discovering service instances.

This adds the missing Filter field to QueryOptions in the
consulService.watch() method.

Fixes #16087

Signed-off-by: Mohammad Varmazyar <m.varmazyar@gmail.com>
Signed-off-by: Mohammad Varmazyar <mrvarmazyar@gmail.com>
This commit is contained in:
Mohammad Varmazyar 2025-10-17 00:29:36 +02:00
parent 16a9a827de
commit 5e342d315c
3 changed files with 53 additions and 0 deletions

View file

@ -2,6 +2,8 @@
## main / unreleased
* [BUGFIX] Discovery/Consul: Fix filter parameter not being applied to health service endpoint, causing Node and Node.Meta filters to be ignored. #16087
## 3.7.0 / 2025-10-15
* [CHANGE] Remote-write: the following metrics are deprecated:

View file

@ -499,6 +499,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*targetgroup.Gr
WaitTime: watchTimeout,
AllowStale: srv.discovery.allowStale,
NodeMeta: srv.discovery.watchedNodeMeta,
Filter: srv.discovery.watchedFilter,
}
t0 := time.Now()

View file

@ -240,6 +240,8 @@ func newServer(t *testing.T) (*httptest.Server, *SDConfig) {
response = ServiceTestAnswer
case "/v1/health/service/test?wait=120000ms":
response = ServiceTestAnswer
case "/v1/health/service/test?filter=NodeMeta.rack_name+%3D%3D+%222304%22&wait=120000ms":
response = ServiceTestAnswer
case "/v1/health/service/other?wait=120000ms":
response = `[]`
case "/v1/catalog/services?node-meta=rack_name%3A2304&stale=&wait=120000ms":
@ -392,6 +394,54 @@ func TestFilterOption(t *testing.T) {
cancel()
}
// TestFilterOnHealthEndpoint verifies that filter is passed to health service endpoint.
func TestFilterOnHealthEndpoint(t *testing.T) {
filterReceived := false
stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
response := ""
switch r.URL.Path {
case "/v1/agent/self":
response = AgentAnswer
case "/v1/health/service/test":
// Verify filter parameter is present in the query
filter := r.URL.Query().Get("filter")
if filter == `Node.Meta.rack_name == "2304"` {
filterReceived = true
}
response = ServiceTestAnswer
default:
t.Errorf("Unhandled consul call: %s", r.URL)
}
w.Header().Add("X-Consul-Index", "1")
w.Write([]byte(response))
}))
defer stub.Close()
stuburl, err := url.Parse(stub.URL)
require.NoError(t, err)
config := &SDConfig{
Server: stuburl.Host,
Services: []string{"test"},
Filter: `Node.Meta.rack_name == "2304"`,
RefreshInterval: model.Duration(1 * time.Second),
}
d := newDiscovery(t, config)
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan []*targetgroup.Group)
go func() {
d.Run(ctx, ch)
close(ch)
}()
checkOneTarget(t, <-ch)
cancel()
// Verify the filter was actually sent to the health endpoint
require.True(t, filterReceived, "Filter parameter should be sent to health service endpoint")
}
func TestGetDatacenterShouldReturnError(t *testing.T) {
for _, tc := range []struct {
handler func(http.ResponseWriter, *http.Request)