当前位置: 首页 > news >正文

网站备案在哪儿c2c模式类型

网站备案在哪儿,c2c模式类型,diywap手机微网站内容管理系统,目前常用的搜索引擎有哪些目录 消息的发送与接收生产者消费者 SpringBoot 集成kafka服务端参数配置 消息的发送与接收 生产者 生产者主要的对象有#xff1a; KafkaProducer #xff0c; ProducerRecord 。 其中 KafkaProducer 是用于发送消息的类#xff0c; ProducerRecord 类用于封装Kafka的消息… 目录 消息的发送与接收生产者消费者 SpringBoot 集成kafka服务端参数配置 消息的发送与接收 生产者 生产者主要的对象有 KafkaProducer ProducerRecord 。 其中 KafkaProducer 是用于发送消息的类 ProducerRecord 类用于封装Kafka的消息。 KafkaProducer 的创建需要指定的参数和含义 bootstrap.servers配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群则这里配置集群中几个部分broker的地址而不是全部当生产者连接上此处指定的broker之后在通过该连接发现集群中的其他节点。key.serializer要发送信息的key数据的序列化类。设置的时候可以写类名也可以使用该类的Class对象。value.serializer要发送消息的value数据的序列化类。设置的时候可以写类名也可以使用该类的Class对象。acks默认值all。 acks0生产者不等待broker对消息的确认只要将消息放到缓冲区就认为消息已经发送完成。该情形不能保证broker是否真的收到了消息retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。acks1表示消息只需要写到主分区即可然后就响应客户端而不等待副本分区的确认。在该情形下如果主分区收到消息确认之后就宕机了而副本分区还没来得及同步该消息则该消息丢失。acksallleader分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活消息就不会丢失。这是Kafka最强的可靠性保证等效于 acks-1。 retriesretries重试次数。当消息发送出现错误的时候系统会重发消息。跟客户端收到错误时重发一样。如果设置了重试还想保证消息的有序性需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION1否则在重试此失败消息的时候其他的消息可能发送成功了。 其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。后面的内容会介绍到。 消费者生产消息后需要broker端的确认可以同步确认也可以异步确认。同步确认效率低异步确认效率高但是需要设置回调对象。 示例如下 import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {MapString, Object configs new HashMap();// 指定初始连接用到的broker地址configs.put(bootstrap.servers, 192.168.100.101:9092);// 指定key的序列化类configs.put(key.serializer, IntegerSerializer.class);// 指定value的序列化类configs.put(value.serializer, StringSerializer.class);// configs.put(acks, all); // configs.put(reties, 3);KafkaProducerInteger, String producer new KafkaProducerInteger, String(configs);// 用于设置用户自定义的消息头字段ListHeader headers new ArrayList();headers.add(new RecordHeader(biz.name, producer.demo.getBytes()));ProducerRecordInteger, String record new ProducerRecordInteger, String(topic_1, // topic0, // 分区0, // keyhello lagou 0, // valueheaders // headers);// 消息的同步确认final FutureRecordMetadata future producer.send(record);final RecordMetadata metadata future.get();System.out.println(消息的主题 metadata.topic());System.out.println(消息的分区号 metadata.partition());System.out.println(消息的偏移量 metadata.offset());// 关闭生产者producer.close();} }如果需要异步发送如下 package com.lagou.kafka.demo.producer;import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer;import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;public class MyProducer1 {public static void main(String[] args) throws ExecutionException, InterruptedException {MapString, Object configs new HashMap();// 指定初始连接用到的broker地址configs.put(bootstrap.servers, 192.168.100.101:9092);// 指定key的序列化类configs.put(key.serializer, IntegerSerializer.class);// 指定value的序列化类configs.put(value.serializer, StringSerializer.class);// configs.put(acks, all); // configs.put(reties, 3);KafkaProducerInteger, String producer new KafkaProducerInteger, String(configs);// 用于设置用户自定义的消息头字段ListHeader headers new ArrayList();headers.add(new RecordHeader(biz.name, producer.demo.getBytes()));ProducerRecordInteger, String record new ProducerRecordInteger, String(topic_1, // topic0, // 分区0, // keyhello lagou 0, // valueheaders // headers);// 消息的异步确认producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println(消息的主题 metadata.topic());System.out.println(消息的分区号 metadata.partition());System.out.println(消息的偏移量 metadata.offset());} else {System.out.println(异常消息 exception.getMessage());}}});// 关闭生产者producer.close();} }消费者 kafka不支持消息的推送当然可以自己已实现采用的消息的拉取poll方法。 消费者主要的对象是kafkaConsumer用于消费消息的类。 其主要参数 bootstrap.servers与kafka建立初始连接的broker地址列表key.deserializerkey的反序列化器value.deserializervalue的反序列化器group.id指定消费者组id用于标识该消费者属于哪个消费者组auto.offset.reset当kafka中没有初始化偏移量或当前偏移量在服务器中不存在如数据被删除了处理办法 earliest自动重置偏移量到最早的偏移量latest自动重置偏移量到最新的偏移量none如果消费者组原来的偏移量previous不存在向消费者抛出异常anything向消费者抛异常 ConsumerConfig类中包含了所有的可以给kafkaConsumer的参数。 示例 import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer;public class MyConsumer1 {public static void main(String[] args) {MapString, Object configs new HashMap();// node1对应于192.168.100.101windows的hosts文件中手动配置域名解析configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, node1:9092);// 使用常量代替手写的字符串配置key的反序列化器configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);// 配置value的反序列化器configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 配置消费组IDconfigs.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_demo1);// 如果找不到当前消费者的有效偏移量则自动重置到最开始// latest表示直接重置到消息偏移量的最后一个configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);KafkaConsumerInteger, String consumer new KafkaConsumerInteger, String(configs);// 先订阅再消费consumer.subscribe(Arrays.asList(topic_1));// 如果主题中没有可以消费的消息则该方法可以放到while循环中每过3秒重新拉取一次// 如果还没有拉取到过3秒再次拉取防止while循环太密集的poll调用。// 批量从主题的分区拉取消息final ConsumerRecordsInteger, String consumerRecords consumer.poll(3_000);// 遍历本次从主题的分区拉取的批量消息consumerRecords.forEach(new ConsumerConsumerRecordInteger, String() {Overridepublic void accept(ConsumerRecordInteger, String record) {System.out.println(record.topic() \t record.partition() \t record.offset() \t record.key() \t record.value());}});consumer.close();} }SpringBoot 集成kafka 这里把生产者和消费者放在一个项目中实际可能是在两个里的。 1、引入依赖 dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency2、 配置 spring.application.namespringboot-kafka-02 server.port8080 # 用于建立初始连接的broker地址 spring.kafka.bootstrap-serversnode1:9092 # producer用到的key和value的序列化类 spring.kafka.producer.key- serializerorg.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value- serializerorg.apache.kafka.common.serialization.StringSerializer # 默认的批处理记录数 spring.kafka.producer.batch-size16384 # 32MB的总发送缓存 spring.kafka.producer.buffer-memory33554432 # consumer用到的key和value的反序列化类 spring.kafka.consumer.key- deserializerorg.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value- deserializerorg.apache.kafka.common.serialization.StringDeserializer # consumer的消费组id spring.kafka.consumer.group-idspring-kafka-02-consumer # 是否自动提交消费者偏移量 spring.kafka.consumer.enable-auto-committrue # 每隔100ms向broker提交一次偏移量 spring.kafka.consumer.auto-commit-interval100 # 如果该消费者的偏移量不存在则自动设置为最早的偏移量 spring.kafka.consumer.auto-offset-resetearliest3、启动类 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;SpringBootApplication public class Demo02SpringbootKafkaApplication {public static void main(String[] args) {SpringApplication.run(Demo02SpringbootKafkaApplication.class, args);}}4、生产者 这里我们就写在Controller里就好如下 import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;RestController public class KafkaSyncProducerController {Autowiredprivate KafkaTemplateInteger, String template;RequestMapping(send/sync/{message})public String send(PathVariable String message) {final ListenableFutureSendResultInteger, String future template.send(topic-spring-01, 0, 0, message);// 同步发送消息try {final SendResultInteger, String sendResult future.get();final RecordMetadata metadata sendResult.getRecordMetadata();System.out.println(metadata.topic() \t metadata.partition() \t metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return success;}}上面是同步发送消息如果异步发送消息可改为如下 import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;RestController public class KafkaAsyncProducerController {Autowiredprivate KafkaTemplateInteger, String template;RequestMapping(send/async/{message})public String send(PathVariable String message) {final ListenableFutureSendResultInteger, String future this.template.send(topic-spring-01, 0, 1, message);// 设置回调函数异步等待broker端的返回结果future.addCallback(new ListenableFutureCallbackSendResultInteger, String() {Overridepublic void onFailure(Throwable throwable) {System.out.println(发送消息失败 throwable.getMessage());}Overridepublic void onSuccess(SendResultInteger, String result) {final RecordMetadata metadata result.getRecordMetadata();System.out.println(发送消息成功 metadata.topic() \t metadata.partition() \t metadata.offset());}});return success;}}5、消费者 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;Component public class MyConsumer {KafkaListener(topics topic-spring-01)public void onMessage(ConsumerRecordInteger, String record) {System.out.println(消费者收到的消息 record.topic() \t record.partition() \t record.offset() \t record.key() \t record.value());}}6、kafka配置类 上面当我们启动生产者和消费者时kafka会自动为我们创建好topic和分区等。那是因为kafka的KafkaAutoConfigration里有个KafkaAdmin他负责自动检测需要创建的topic和分区等。如果我们想自己创建或者自定义KafkaTemplate一般不会这么做可以使用配置类如下 import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap; import java.util.Map;Configuration public class KafkaConfig {Beanpublic NewTopic topic1() {return new NewTopic(nptc-01, 3, (short) 1);}Beanpublic NewTopic topic2() {return new NewTopic(nptc-02, 5, (short) 1);}Beanpublic KafkaAdmin kafkaAdmin() {MapString, Object configs new HashMap();configs.put(bootstrap.servers, node1:9092);KafkaAdmin admin new KafkaAdmin(configs);return admin;}BeanAutowiredpublic KafkaTemplateInteger, String kafkaTemplate(ProducerFactoryInteger, String producerFactory) {// 覆盖ProducerFactory原有设置MapString, Object configsOverride new HashMap();configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);KafkaTemplateInteger, String template new KafkaTemplateInteger, String(producerFactory, configsOverride);return template;}}服务端参数配置 $KAFKA_HOME/config/server.properties文件中的一些配置。 1、zookeeper.connect 该参数用于配置Kafka要连接的Zookeeper/集群的地址。 它的值是一个字符串使用逗号分隔Zookeeper的多个地址。Zookeeper的单个地址是 host:port形式的可以在最后添加Kafka在Zookeeper中的根节点路径。 如 zookeeper.connectnode2:2181,node3:2181,node4:2181/myKafka 12、listeners 用于指定当前Broker向外发布服务的地址和端口。 配置项为 listenersPLAINTEXT://:9092如下 PLAINTEXT是一种协议名称上面ip地址没写可以配置成listenersPLAINTEXT://0.0.0.0:9092则只有本机可以访问。也可以是其他配置。 可以配置多个逗号分割。但是多个listener的协议名称不能相同且端口号不能相同。如果想用一个协议则需要在listener.security.protocol.map维护听器名称和协议的map。 可以与 advertised.listeners 配合用于做内外网隔离比如创建topic和分区的等管理方面的使用一个地址发送和消费消息则使用另一个地址即管理和使用分开。 内外网隔离配置 listener.security.protocol.map 监听器名称和安全协议的映射配置。比如可以将内外网隔离即使它们都使用SSL。 listener.security.protocol.mapINTERNAL:SSL,EXTERNAL:SSL冒号前面代表监听器名称后面代表真正的协议。每个监听器的名称只能在map中出现一次。 listeners 用于配置broker监听的URI以及监听器名称列表使用逗号隔开多个URI及监听器名称。如果监听器名称代表的不是安全协议必须配置listener.security.protocol.map。每个监听器必须使用不同的网络端口。 advertised.listeners 需要将该地址发布到zookeeper供客户端使用。 可以在zookeeper的 get /myKafka/brokers/ids/broker.id 中找到。 在IaaS环境该条目的网络接口得与broker绑定的网络接口不同。 如果不设置此条目就使用listeners的配置。跟listeners不同该条目不能使用0.0.0.0网络端口。 advertised.listeners的地址必须是listeners中配置的或配置的一部分。 inter.broker.listener.name 用于配置broker之间通信使用的监听器名称该名称必须在advertised.listeners列表中。 inter.broker.listener.nameEXTERNAL典型配置如下 3、 broker.id 该属性用于唯一标记一个Kafka的Broker它的值是一个任意integer值。当Kafka以分布式集群运行的时候尤为重要。 最好该值跟该Broker所在的物理主机有关的如主机名为 host1.lagou.com 则 broker.id1 如果主机名为 192.168.100.101 则 broker.id101 等等。 4、 log.dir 通过该属性的值指定Kafka在磁盘上保存消息的日志片段的目录。它是一组用逗号分隔的本地文件系统路径。 如果指定了多个路径那么broker 会根据“最少使用”原则把同一个分区的日志片段保存到同一个路径下。 broker 会往拥有最少数目分区的路径新增分区而不是往拥有最小磁盘空间的路径新增分区。
http://www.yingshimen.cn/news/34702/

相关文章:

  • 公司网站未备案智能建站系统开发
  • 校园网站平台建设推广营销策划
  • 做网站有哪些流程网络优化工作应该怎么做
  • 株洲专业网站建设品牌gif8.net基于wordpress
  • 营销型的物流网站什么网站可以做单词书
  • 个人网站开发用什么语言天津网站建设 熊掌号
  • 通辽建设网站招聘网站入职分析表怎么做
  • 网站的百度百科怎么做网站建设APP的软件
  • 网站分站的实现方法网站后台编辑怎么做
  • 网站制作详细流程广东搜索引擎优化
  • 学校网站建设评分标准山东省建设厅网站首页
  • 无水印视频素材下载免费网站国家企业公示网入口官网登录
  • 吉利汽车网站开发环境分析萧山seo
  • 网站建设改版公司专做农产品的网站有哪些
  • 银川哪家网络公司做网站做得好网页制造基础课程
  • 网站页面网络维护合同
  • 做团购网站视频做二维码签到的网站
  • 贵池网站建设河南今天发生的重大新闻
  • 购买网站域名空间建设互联网站机房需要哪些设备
  • 宝安附近做网站公司百度网页入口
  • 广东茂名网站建设wordpress插件免费分享
  • 网站建设需求单郑州企业建设网站服务
  • 浙江汉农建设有限公司网站网站开发用什么技术做好
  • 接设计网站威海网站建设哪家好
  • 中建八局第一建设公司网站胖咯科技 网站建设
  • 怎么自己做投票网站淘宝客api采集发布到wordpress
  • 郑州做网站价格建筑公司发展规划
  • 做旅游网站需要引进哪些技术人才做网站用asp和html
  • 加盟网网站建设网页制作与设计实验报告总结
  • 网站都有后台吗wordpress 插件列表