怎么做网站劳务中介wordpress essential
怎么做网站劳务中介,wordpress essential ,北京旅行社网站建设公司,网络营销与策划1.生产者和消费者确认机制
确认机制的本质#xff1a;明确告诉对方#xff1a;消息已经安全到达/已经被成功处理
如果没有确认机制#xff1a;生产者不知道消息有没有发成功消费者不知道消息有没有处理成功系统只能“猜”#xff0c;必然丢消息在消息队列中#xff0c;生产…1.生产者和消费者确认机制确认机制的本质明确告诉对方消息已经安全到达/已经被成功处理如果没有确认机制生产者不知道消息有没有发成功消费者不知道消息有没有处理成功系统只能“猜”必然丢消息在消息队列中生产者和消费者确认机制是确保消息可靠传递的关键。主要有两种确认机制生产者确认当消息成功发送到交换器后交换器会返回确认给生产者确保消息的发送成功。消费者确认当消息被成功消费后消费者会向交换器发送确认回执确保消息的消费完成。这两种机制有助于解决消息丢失的问题确保在分布式系统中消息的可靠性和安全性2.RabbitMQ的生产者确认机制一般情况下只要生产者与MQ之间的网络连接顺畅基本不会出现发送消息丢失的情况因此大多数情况下我们无需考虑这种问题。不过在少数情况下也会出现消息发送到MQ之后丢失的现象比如MQ内部处理消息的进程发生了异常生产者发送消息到达MQ后未找到Exchange生产者发送消息到达MQ的Exchange后未找到合适的Queue因此无法路由针对上述情况RabbitMQ提供了生产者消息确认机制包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下当生产者发送消息给MQ后MQ会根据消息处理的情况返回不同的回执。具体如图所示总结如下当消息投递到MQ但是路由失败时通过Publisher Return返回异常信息同时返回ack的确认信息代表投递成功临时消息投递到了MQ并且入队成功返回ACK告知投递成功持久消息投递到了MQ并且入队完成持久化返回ACK 告知投递成功其它情况都会返回NACK告知投递失败其中ack和nack属于Publisher Confirm机制ack是投递成功nack是投递失败。而return则属于Publisher Return机制。默认两种机制都是关闭状态需要通过配置文件来开启。开启生产者确认spring:rabbitmq:publisher-confirm-type:correlated# 开启publisher confirm机制并设置confirm类型publisher-returns:true# 开启publisher return机制publisher-confirm-type 有三种模式none关闭 confirm 机制simple以同步阻塞等待的方式返回 MQ 的回执消息correlated以异步回调方式的方式返回 MQ 的回执消息每个 RabbitTemplate 只能配置一个 ReturnCallbackJava代码实现① Maven 依赖dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.16.0/version/dependency② 生产者代码Confirm Returnimportcom.rabbitmq.client.*;publicclassRabbitProducerConfirmDemo{privatestaticfinalStringEXCHANGEdemo.exchange;privatestaticfinalStringROUTING_KEYdemo.key;publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactorynewConnectionFactory();factory.setHost(localhost);Connectionconnectionfactory.newConnection();Channelchannelconnection.createChannel();// 开启 confirm 模式channel.confirmSelect();// Confirm 回调消息是否到达 Brokerchannel.addConfirmListener((deliveryTag,multiple)-System.out.println(消息发送成功tagdeliveryTag),(deliveryTag,multiple)-System.out.println(消息发送失败tagdeliveryTag));// Return 回调路由失败channel.addReturnListener(returnMessage-System.out.println(路由失败newString(returnMessage.getBody())));channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.DIRECT,true);Stringmsghello rabbit confirm;channel.basicPublish(EXCHANGE,ROUTING_KEY,true,// mandatoryMessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());System.out.println(消息已发送);}}confirmSelect()开启生产者确认Confirm保证消息写入 BrokerReturn保证消息路由正确3.RabbitMQ的消费者确认机制为了确认消费者是否成功处理消息RabbitMQ提供了消费者确认机制Consumer Acknowledgement。即当消费者处理消息结束后应该向RabbitMQ发送一个回执告知RabbitMQ自己消息处理状态。回执有三种可选值ack成功处理消息RabbitMQ从队列中删除该消息nack消息处理失败RabbitMQ需要再次投递消息reject消息处理失败并拒绝该消息RabbitMQ从队列中删除该消息一般reject方式用的较少除非是消息格式有问题那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获消息处理成功时返回ack处理失败时返回nack.由于消息回执的处理代码比较统一因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式有三种模式none不处理。即消息投递给消费者后立刻ack消息会立刻从MQ删除。非常不安全不建议使用manual手动模式。需要自己在业务代码中调用api发送ack或reject存在业务入侵但更灵活auto自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强当业务正常执行时则自动返回ack. 当业务出现异常时根据异常判断返回不同结果如果是业务异常会自动返回nack如果是消息处理或校验异常自动返回reject;返回Reject的常见异常有Starting with version 1.3.2, the default ErrorHandler is now a ConditionalRejectingErrorHandler that rejects (and does not requeue) messages that fail with an irrecoverable error. Specifically, it rejects messages that fail with the following errors:o.s.amqp…MessageConversionException: Can be thrown when converting the incoming message payload using a MessageConverter.o.s.messaging…MessageConversionException: Can be thrown by the conversion service if additional conversion is required when mapping to a RabbitListener method.o.s.messaging…MethodArgumentNotValidException: Can be thrown if validation (for example, Valid) is used in the listener and the validation fails.o.s.messaging…MethodArgumentTypeMismatchException: Can be thrown if the inbound message was converted to a type that is not correct for the target method. For example, the parameter is declared as MessageFoo but MessageBar is received.java.lang.NoSuchMethodException: Added in version 1.6.3.java.lang.ClassCastException: Added in version 1.6.3.通过下面的配置可以修改SpringAMQP的ACK处理方式spring:rabbitmq:listener:simple:acknowledge-mode:manual # 手动处理Java代码实现RabbitMQ消费者默认情况下是自动确认这里就不给代码演示了RabbitMQ 消费者确认手动 ACK先处理业务再ACKimportcom.rabbitmq.client.*;publicclassRabbitConsumerAckDemo{privatestaticfinalStringQUEUEdemo.queue;publicstaticvoidmain(String[]args)throwsException{ConnectionFactoryfactorynewConnectionFactory();factory.setHost(localhost);Connectionconnectionfactory.newConnection();Channelchannelconnection.createChannel();channel.queueDeclare(QUEUE,true,false,false,null);// 手动 ACKbooleanautoAckfalse;channel.basicConsume(QUEUE,autoAck,(consumerTag,message)-{try{StringmsgnewString(message.getBody());System.out.println(收到消息msg);// 模拟业务处理if(msg.contains(error)){thrownewRuntimeException(业务异常);}// 业务成功 → ACKchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch(Exceptione){// 业务失败 → NACK重新入队 or DLQchannel.basicNack(message.getEnvelope().getDeliveryTag(),false,true// 是否重新入队);}},consumerTag-{});}}4.kafka的生产者确认机制Kafka 的确认由acks参数控制。ACK0这是最不可靠的模式。生产者在发送消息后不会等待来自服务器的确认。这意味着消息可能会在发送之后丢失而生产者将无法知道它是否成功到达服务器。ACK1这是默认模式也是一种折衷方式。在这种模式下生产者会在消息发送后等待来自分区领导者(leader的确认但不会等待所有副本replicas的确认。这意味着只要消息被写入分区领导者生产者就会收到确认。如果分区领导者成功写入消息但在同步到所有副本之前宕机消息可能会丢失。ACK-1这是最可靠的模式。在这种模式下生产者会在消息发送后等待所有副本的确认。只有在所有副本都成功写入消息后生产者才会收到确认。这确保了消息的可靠性但会导致更长的延迟。Java代码实现① Maven 依赖dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.6.0/version/dependency② Kafka 生产者代码acksallISR 全部写成功才返回幂等生产者防止重试导致重复消息importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassKafkaProducerAckDemo{publicstaticvoidmain(String[]args){Properties propsnewProperties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);// 关键配置props.put(ProducerConfig.ACKS_CONFIG,all);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);props.put(ProducerConfig.RETRIES_CONFIG,3);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);KafkaProducerString,StringproducernewKafkaProducer(props);ProducerRecordString,StringrecordnewProducerRecord(demo-topic,order-1,hello kafka);producer.send(record,(metadata,exception)-{if(exceptionnull){System.out.println(发送成功metadata.topic()-metadata.partition());}else{System.out.println(发送失败exception.getMessage());}});producer.close();}}5.kafka的消费者确认机制kafka没有显示ACK而是用offest表示“确认”1自动提交offsetenable.auto.committrue定时提交 offset业务还没处理完offset 已提交宕机 →消息丢失2手动提交 offset推荐enable.auto.commitfalseJava代码实现enable.auto.commitfalse业务成功后再提交 offset提供至少一次的语义importorg.apache.kafka.clients.consumer.*;importjava.time.Duration;importjava.util.Collections;importjava.util.Properties;publicclassKafkaConsumerManualCommitDemo{publicstaticvoidmain(String[]args){Properties propsnewProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,localhost:9092);props.put(ConsumerConfig.GROUP_ID_CONFIG,demo-group);// 关键配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString,StringconsumernewKafkaConsumer(props);consumer.subscribe(Collections.singletonList(demo-topic));while(true){ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofMillis(1000));for(ConsumerRecordString,Stringrecord:records){System.out.println(消费消息record.value());// 处理业务逻辑}// 业务成功后手动提交 offsetconsumer.commitSync();}}}