史上最轻便好用的kafka ui界面可视化图形界面工具

   日期:2024-12-27    作者:1nkqt 移动:http://mip.riyuangf.com/mobile/quote/72819.html
package com.jq.kafkaui.util;

史上最轻便好用的kafka ui界面可视化图形界面工具

import com.alibaba.fastjson.JSONObject; import com.jq.kafkaui.domain.Topic; import com.jq.kafkaui.dto.ResponseDto; import com.jq.kafkaui.dto.SourceInfo; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Field; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Slf4j public class KafkaUtil { public static AdminClient createAdminClientByProperties(SourceInfo sourceInfo) { Properties prop = getCommonProperties(sourceInfo); prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, sourceInfo.getBroker()); prop.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000"); prop.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000"); return AdminClient.create(prop); } private static Properties getCommonProperties(SourceInfo sourceInfo) { Properties prop = new Properties(); String userName = sourceInfo.getUserName(); String password = sourceInfo.getPassword(); if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(password)) { prop.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + userName + " password=" + password + ";"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.mechanism", "PLAIN"); } return prop; } public static ResponseDto listTopicsWithOptions(SourceInfo sourceInfo, String keyword) { AdminClient adminClient = null; try { // 创建AdminClient客户端对象 adminClient = createAdminClientByProperties(sourceInfo); ListTopicsOptions options = new ListTopicsOptions(); // 列出内部的Topic options.listInternal(true); // 列出所有的topic ListTopicsResult result = adminClient.listTopics(options); Collection<TopicListing> topicListings = result.listings().get(); List<Topic> collect = topicListings.stream().map(t -> { Topic topic = new Topic(); topic.setName(t.name()); topic.setInternal(t.isInternal()); return topic; }).sorted(Comparator.comparing(t -> t.getName())).collect(Collectors.toList()); if (keyword != null) { collect = collect.stream().filter(t -> t.getName().contains(keyword)).collect(Collectors.toList()); } ResponseDto success = ResponseDto.success(collect); return success; } catch (Exception e) { log.error(e.getMessage(), e); return ResponseDto.fail(e.getMessage()); } finally { adminClient.close(); } } public static void createTopic(SourceInfo sourceInfo, String topic, Integer partition, Integer replica) throws Exception { AdminClient adminClient = null; try { adminClient = createAdminClientByProperties(sourceInfo); List<NewTopic> topicList = new ArrayList(); NewTopic newTopic = new NewTopic(topic, partition, replica.shortValue()); topicList.add(newTopic); CreateTopicsResult result = adminClient.createTopics(topicList); result.all().get(); result.values().forEach((name, future) -> System.out.println("topicName:" + name)); } catch (Exception e) { } finally { adminClient.close(); } } public static Producer<String, String> getProducer(SourceInfo sourceInfo) { Properties props = getCommonProperties(sourceInfo); props.put("bootstrap.servers", sourceInfo.getBroker()); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); return producer; } public static KafkaConsumer<String, String> getConsumer(SourceInfo sourceInfo, String topic, String group, String offset) { Properties props = getCommonProperties(sourceInfo); props.setProperty("bootstrap.servers", sourceInfo.getBroker()); props.setProperty("group.id", group); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("auto.offset.reset", offset); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topic)); return consumer; } public static KafkaConsumer<String, String> getConsumer(SourceInfo sourceInfo, Collection<String> topics, String group, String offset) { Properties props = getCommonProperties(sourceInfo); props.setProperty("bootstrap.servers", sourceInfo.getBroker()); props.setProperty("group.id", group); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("auto.offset.reset", offset); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); return consumer; } public static void main(String[] args) throws Exception { } public static void deleteTopic(SourceInfo sourceInfo, String name) { AdminClient adminClient = createAdminClientByProperties(sourceInfo); List<String> list = new ArrayList<>(); list.add(name); adminClient.deleteTopics(list); adminClient.close(); } public static JSONObject node2Json(Node node) { JSONObject leaderNode = new JSONObject(); leaderNode.put("id", node.id()); leaderNode.put("host", node.host()); leaderNode.put("port", node.port()); leaderNode.put("rack", node.rack());

特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


举报收藏 0评论 0
0相关评论
相关最新动态
推荐最新动态
点击排行
{
网站首页  |  关于我们  |  联系方式  |  使用协议  |  隐私政策  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号