kafka生产者回调

李青石 5月前 ⋅ 532 阅读
kafkaTemplate.sendDefault(jsonObj.toString()).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata recordMetadata = result.getRecordMetadata();
logger.info("====kafka topic: "+recordMetadata.topic()+", offset: "+recordMetadata.offset()+", partition: "+recordMetadata.partition());
}
@Override
public void onFailure(Throwable e) {
logger.error("====kafka sendDefault error: " + e.getMessage());
}
});
我有话说:

全部评论: 0