It's a little confusing where the lag is coming from since it appears you are retrieving offsets only.
But this is how it would be structured to get the long values regardless of what they are called (I guessed at the constructor for the OffsetAndLag
class)
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = ...;
Map<TopicPartition, Long> topicEndOffsets = ...;
- Use the key from
consumerGroupOffsets
for the target key.
- use that key to retrieve the lag (or offset) from
topicEndOffsets
- use the value for that key (which should be
OffsetAndMetadata
) to get the offset (or lag)
Map<TopicPartition, OffsetAndLag> consumerGroupLag =
consumerGroupOffsets.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey,
entry -> new OffsetAndLag(
topicEndOffsets
.get(entry.getKey()),
entry.getValue().offset())));
Expected OffsetAndLag
class (or something similar)
static class OffsetAndLag {
public long offset;
public long lag;
public OffsetAndLag(long offset, long lag) {
this.offset = offset;
this.lag = lag;
}
}
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…