spring-cloud-stream: Management "/actuator/bindings" endpoint only returning last configured ConsumerChannel for Kafka

I am using Spring Cloud Stream with Kafka where I have many bindings configured. When I try to access their detail by getting “/actuator/bindings” only the last one that I include on @EnableBinding(value = { OneConsumerChannel.class, TwoConsumerChannel.class, ThreeConsumerChannel.class}) in this case ThreeConsumerChannel.class is displayed. I have debugged the BindingsEndpoint which return the bindings from inputBindingLifecycles list which always gets populated with the latest binding.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 16 (7 by maintainers)

Most upvoted comments

I created a customBindings actuator and it worked pretty well, it can serve as a workaround for now.

package aero.sita.aggregator.springbugfix;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.actuate.endpoint.annotation.Endpoint;
import org.springframework.boot.actuate.endpoint.annotation.ReadOperation;
import org.springframework.boot.actuate.endpoint.annotation.Selector;
import org.springframework.boot.actuate.endpoint.annotation.WriteOperation;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

@Endpoint(id = "custombindings")
@Service
public class CustomBindingEndpoint {

  private final BindingService bindingService;
  private final ObjectMapper objectMapper;

  public CustomBindingEndpoint(BindingService bindingService) {
    this.bindingService = bindingService;
    this.objectMapper = new ObjectMapper();
  }

  @ReadOperation
  public List<?> queryStates() {
    return objectMapper.convertValue(getBindings(), List.class);
  }

  @ReadOperation
  public Binding<?> queryState(@Selector String name) {
    Assert.notNull(name, "'name' must not be null");
    return this.locateBinding(name);
  }

  @WriteOperation
  public void changeState(@Selector String name, State state) {
    Binding<?> binding = this.locateBinding(name);
    if (binding != null) {
      switch (state) {
        case STARTED:
          binding.start();
          break;
        case STOPPED:
          binding.stop();
          break;
        case PAUSED:
          binding.pause();
          break;
        case RESUMED:
          binding.resume();
          break;
        default:
          break;
      }
    }
  }

  private Binding<?> locateBinding(String name) {
    Binding<?> bindingReturn = null;
    Collection<Binding<?>> bindings = getBindings();
    if(!CollectionUtils.isEmpty(bindings)){
      bindingReturn = bindings.stream()
        .filter(binding -> name.equals(binding.getName()))
        .findFirst()
        .orElse(null);
    }
    return bindingReturn;
  }

  private Collection<Binding<?>> getBindings(){
    List<Binding<?>> bindings = new ArrayList<>();
    Map<String, Binding<?>> producerBindings = (Map<String, Binding<?>>) new DirectFieldAccessor(bindingService).getPropertyValue("producerBindings");
    Map<String, List<Binding<?>>> consumerBindings  =  (Map<String, List<Binding<?>>>) new DirectFieldAccessor(bindingService).getPropertyValue("consumerBindings");
    bindings.addAll(producerBindings.values());
    consumerBindings.values().forEach(consumerList-> bindings.addAll(consumerList));
    return bindings;
  }

  private enum State {
    STARTED,
    STOPPED,
    PAUSED,
    RESUMED;
  }
}

You also need to include it on the management settings:

management:
  endpoints:
    web:
      base-path: /management
      exposure:
        include: ["configprops", "env", "health", "info", "threaddump", "logfile", "bindings", "custombindings" ]