Welcome to 16892 Developer Community-Open, Learning,Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have topology, that does following:

  • reads from source topic
  • updates record header with current timestamp header
  • accumulates incoming records in state store, until some limit is reached, and once reached floods downstream topology with batch of received messages
  • sends records to sink topic

So it looks something like this

sourceTopic -> 
transformer (context.headers.add("timeStamp", System.currentTimeMillis())) -> 
transformer (
stateStore.put(key, value)
if(stateStore.approximateNumEntries() >= limit){
   stateStore.all().forEach(keyValue -> processorContext.forward(keyValue.key, keyValue.value))
}) -> 
sinkTopic

The problem is, that after second transformer (that accumulates events) header, that was added in first transformer is not populated. I have custom implementation of ProductionInterceptor, that relies on this header and it throws NullPointerException

  @Override
  public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> record) {
      long currentTime = System.currentTimeMillis();
      byte[] startTimestampByte =
          record.headers().lastHeader(timeStamp).value();
      long startTimestampLong = ByteBuffer.wrap(startTimestampByte).getLong();
      log.info("Duration of event processing {}", System.currentTimeMillis()- startTimestampLong);
    }
    return record;
  }

So question is, is there any way to propagate custom headers via calling ProcessorContext.forward? Thanks in advance.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
771 views
Welcome To Ask or Share your Answers For Others

1 Answer

等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to 16892 Developer Community-Open, Learning and Share
...