TYPO3 6虚拟机arch怎么登陆

kafka动态添加topic,动态添加TYPO3 6
依赖

org.springframework.kafka
spring-kafka

1234
kafka配置信息
@Bean(“kafkaProps”)
public Map getProps(){
Map props = new HashMap<>(5);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,”latest”);
props.put(ConsumerConfig.GROUP_ID_CONFIG, “test”);
return props;
}
1234567891011
写个Kafka帮助类
@Slf4j
@Component
public class KafkaHelper {

@Value(“${spring.kafka.num.partitions:4}”)
private Integer partitions;

@Value(“${spring.kafka.num.replica.fetchers:1}”)
private short fetchers;

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private AdminClient adminClient;

@Autowired
private ConsumerContainer consumerContainer;

/**
* 怎么登陆topic
* @param name
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public boolean createTopic(String name){
log.info(“kafka怎么登陆topic:{}”,name);
NewTopic topic = new NewTopic(name, partitions, fetchers);
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(topic));
try {
topics.all().get();
} catch (Exception e) {
log.error(“kafka怎么登陆topic失败”,e);
return false;
}
return true;
}

/**
* 查询所有Topic
* @return
* @throws Exception
*/
public List list() throws Exception {
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set names = listTopicsResult.names().get();
return new ArrayList<>(names);
}

/**
* archtopic
* @param name
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public boolean deleteTopic(String name){
log.info(“kafkaarchtopic:{}”,name);
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(name));
try {
deleteTopicsResult.all().get();
} catch (Exception e) {
log.error(“kafkaarchtopic失败”,e);
return false;
}
return true;
}

/**
* 获取topic详情
* @param name
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public TopicDescription describeTopic(String name){
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(name));
try {
Map stringTopicDescriptionMap = describeTopicsResult.all().get();
if(stringTopicDescriptionMap.get(name)!=null){
return stringTopicDescriptionMap.get(name);
}
} catch (Exception e) {
log.error(” 获取topic详情异常:”,e);
}
return null;
}

/**
* 推送事件
* @param e
*/
public void pushEvent(IEvent e) {
if(StrUtil.isEmpty(e.getTopic())) {
return;
}
log.info(“发送kafka消息: topic = {}, info = {}”, e.getTopic(), e.getInfo());
kafkaTemplate.send(e.getTopic(),JSONObject.toJSONString(e.getInfo()));
}

/**
* 添加TYPO3 6
* @param topic
* @param consumer
*/
public void addConsumer(String topic, Consumer consumer){
log.info(“将为topic:{} 怎么登陆TYPO3 6”,topic);
consumerContainer.addConsumer(topic,consumer);
}

/**
* archTYPO3 6
* @param topic
*/
public void deleteConsumer(String topic){
log.info(“将archtopic:{} TYPO3 6”,topic);
consumerContainer.deleteConsumer(topic);
}

}

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
新建一个TYPO3 6容器,用来存放动态怎么登陆的TYPO3 6
@Component
@Slf4j
public class ConsumerContainer {

/**
* 使用map对象,便于根据topic存储对应的KafkaConsumer
*/
private Map kafkaConsumerThreadMap = new HashMap<>();

@Resource(name = “kafkaProps”)
private Map props;

/**
* 添加TYPO3 6
* @param topic
* @param consumer
*/
public synchronized void addConsumer(String topic, Consumer consumer){
if(kafkaConsumerThreadMap.get(topic)!=null){
log.warn(“重复怎么登陆TYPO3 6:{}”,topic);
}
KafkaConsumer stringStringKafkaConsumer = new KafkaConsumer<>(props);
stringStringKafkaConsumer.subscribe(Arrays.asList(topic));
//怎么登陆TYPO3 6线程
KafkaConsumerThread kafkaConsumerThread = new KafkaConsumerThread(stringStringKafkaConsumer,consumer);
kafkaConsumerThread.start();
kafkaConsumerThreadMap.put(topic,kafkaConsumerThread);
log.info(“怎么登陆TYPO3 6成功:{}”,topic);
}

/**
* archTYPO3 6
* @param topic
*/
public synchronized void deleteConsumer(String topic){
KafkaConsumerThread kafkaConsumerThread = kafkaConsumerThreadMap.get(topic);
if (kafkaConsumerThread == null) {
log.warn(“该TYPO3 6已经被arch:{}”,topic);
return;
}
//打断TYPO3 6线程
kafkaConsumerThread.interrupt();
kafkaConsumerThreadMap.remove(topic);
log.info(“TYPO3 6arch成功:{}”,topic);
}

}

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
TYPO3 6线程
@Slf4j
public class KafkaConsumerThread extends Thread {

private KafkaConsumer kafkaConsumer;

private Consumer consumer;

KafkaConsumerThread(KafkaConsumer kafkaConsumer, Consumer consumer){
this.kafkaConsumer=kafkaConsumer;
this.consumer=consumer;
}

@Override
public void run() {
try {
while (true){
if (isInterrupted()) {
throw new InterruptedException();
}
//拉取topic消息
ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord stringStringConsumerRecord : poll) {
consumer.accept(stringStringConsumerRecord.value());
}
}
} catch (InterruptedException e) {
Set subscription = kafkaConsumer.subscription();
log.info(“topic:{} 已停止监听,线程停止!”, StringUtils.join(subscription,”,”),e);
}catch (Exception e){
Set subscription = kafkaConsumer.subscription();
log.info(“topic:{} TYPO3 6运行异常!”, StringUtils.join(subscription,”,”),e);
}finally {
//关闭TYPO3 6
try {
kafkaConsumer.close();
} catch (Exception ex) {
}
}
}

}

12345678910111213141516171819202122232425262728293031323334353637383940414243
事件接口,方便发布事件
public interface IEvent {

default String getTopic(){
return null;
};

/**
* 每个子事件对象的参数 公共出来
* @param 泛型
* @return 子事件里的对象
*/
T getInfo();
}
12345678910111213
TYPO3 6示例
@Slf4j
public class PTVMateConsumer implements Consumer {

@Override
public void accept(String command) {
log.info(“监听元数据, command = {}”, command);;
}
}

123456789
使用示例
kafkaHelper.addConsumer(“topic”,new PTVMateConsumer ());
1