From bb1bd541da2629ec52efe82aa7f067a0766aab73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 24 Oct 2025 18:55:38 +0200 Subject: [PATCH] attempts to reset offset of whole group --- common/event/reader.go | 119 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/common/event/reader.go b/common/event/reader.go index 0b348caf..ad8c6bd1 100644 --- a/common/event/reader.go +++ b/common/event/reader.go @@ -27,13 +27,14 @@ package event import ( "context" "fmt" + "sync" + "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger/infologger" pb "github.com/AliceO2Group/Control/common/protos" "github.com/segmentio/kafka-go" "github.com/spf13/viper" "google.golang.org/protobuf/proto" - "sync" ) // Reader interface provides methods to read events. @@ -64,6 +65,8 @@ type KafkaReader struct { // NewReaderWithTopic creates a KafkaReader for the provided topic and starts it. // If latestOnly is true the reader attempts to seek to the latest offsets on start so that // only new messages (produced after creation) are consumed. +// WARNING: If you will create more than one reader with groupID != nil and latestOnly == true +// there might inconsitencies while reading. func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader { cfg := kafka.ReaderConfig{ Brokers: viper.GetStringSlice("kafkaEndpoints"), @@ -73,13 +76,21 @@ func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *Kaf MaxBytes: 10e7, } + // Kafka-io only enables to SetOffset for whole consumer group which muset be done before consumer is created. + // WARNING: This will might cause issues if we are creating more than one readers + if latestOnly && groupID != "" { + if err := resetGroupToLatest(context.Background(), viper.GetStringSlice("kafkaEndpoints"), string(topic), groupID); err != nil { + log.WithField(infologger.Level, infologger.IL_Devel). + Errorf("failed to set offset to last offset for whole consumer group %s: %v", groupID, err) + } + } + rk := &KafkaReader{ Reader: kafka.NewReader(cfg), topic: string(topic), } - if latestOnly { - // best-effort: set offset to last so we don't replay older messages + if latestOnly && groupID == "" { if err := rk.SetOffset(kafka.LastOffset); err != nil { log.WithField(infologger.Level, infologger.IL_Devel). Warnf("failed to set offset to last offset: %v", err) @@ -130,3 +141,105 @@ func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) { } return &evt, nil } + +func resetGroupToLatest(ctx context.Context, brokers []string, topic, groupID string) error { + if len(brokers) == 0 { + return fmt.Errorf("cannot reset offset with 0 brokers") + } + + client := kafka.Client{Addr: kafka.TCP(brokers[0])} + + meta, err := client.Metadata(ctx, &kafka.MetadataRequest{Topics: []string{topic}}) + if err != nil { + return fmt.Errorf("metadata request: %w", err) + } + if len(meta.Topics) == 0 { + return fmt.Errorf("topic %q not found", topic) + } + partitions := meta.Topics[0].Partitions + if len(partitions) == 0 { + return fmt.Errorf("topic %q has no partitions", topic) + } + + offsetsMap, err := getLastOffsetsFromPartitions(ctx, partitions, brokers, topic) + if err != nil { + return err + } + + log.Info("new group") + consumerGroup, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ID: groupID, Brokers: brokers, Topics: []string{topic}}) + if err != nil { + return fmt.Errorf("failed to create consumer group: %w", err) + } + log.Info("next gen") + gen, err := consumerGroup.Next(ctx) + if err != nil { + return fmt.Errorf("failed to get next Generation: %w", err) + } + log.Info("commiting offsets") + err = gen.CommitOffsets(offsetsMap) + if err != nil { + return fmt.Errorf("failed to commit offsets: %w", err) + } + + // req := &kafka.OffsetCommitRequest{ + // GroupID: groupID, + // GenerationID: -1, + // MemberID: "", + // Topics: offsetsMap, + // } + // + // resp, err := client.OffsetCommit(ctx, req) + // if err != nil { + // return fmt.Errorf("offset commit failed: %w", err) + // } + // for t, parts := range resp.Topics { + // for _, p := range parts { + // if p.Error != nil { + // return fmt.Errorf("commit error topic=%s partition=%d: %v", t, p.Partition, p.Error) + // } + // } + // } + + log.WithField(infologger.Level, infologger.IL_Devel).Debugf("Committed group=%s offsets to latest for topic=%s", groupID, topic) + return nil +} + +// func getLastOffsetsFromPartitions(ctx context.Context, partitions []kafka.Partition, brokers []string, topic string) (map[string][]kafka.OffsetCommit, error) { +func getLastOffsetsFromPartitions(ctx context.Context, partitions []kafka.Partition, brokers []string, topic string) (map[string]map[int]int64, error) { + // topicsMap := map[string][]kafka.OffsetCommit{} + topicsMap := map[string]map[int]int64{} + topicsMap[topic] = map[int]int64{} + for _, p := range partitions { + topicsMap[topic][int(p.ID)] = kafka.LastOffset + + // topicsMap[topic] = append(topicsMap[topic], kafka.OffsetCommit{ + // Partition: int(p.ID), + // Offset: kafka.LastOffset, + // Metadata: "reset", + // }) + + // partition := int(p.ID) + // + // conn, err := kafka.DialLeader(ctx, "tcp", brokers[0], topic, partition) + // if err != nil { + // return nil, fmt.Errorf("dial leader partition %d: %w", partition, err) + // } + // _ = conn.SetDeadline(time.Now().Add(5 * time.Second)) + // + // last, err := conn.ReadLastOffset() + // conn.Close() + // if err != nil { + // return nil, fmt.Errorf("read last offset partition %d: %w", partition, err) + // } + // + // topicsMap[topic] = append(topicsMap[topic], kafka.OffsetCommit{ + // Partition: partition, + // Offset: last, + // Metadata: "reset", + // }) + // + // log.WithField(infologger.Level, infologger.IL_Devel).Debugf("last offset (%d) from partition %d:", last, partition) + } + return topicsMap, nil +}