>> ⾃定义# input 的配置:
spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group1
# output 的配置:
spring.cloud.stream.bindings.output1.destination=test-topic1
spring.cloud.stream.t-type=text/plain
spring.cloud.stream.up=test-group1
SpringCloudStream RocketMQ事务消息 Apache RocketMQ在4.3.0版中已经⽀持分布式事务消息,这⾥RocketMQ采⽤了2PC的思想来实现了提交事务消息,同时增加⼀个补偿逻辑来处理⼆阶段超时或者失败的消息,如下图所⽰: 上图说明了事务消息的⼤致⽅案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程;
1.事务消息发送及提交:
(1) 发送消息(half消息);
(2) 服务端响应消息写⼊结果;
(3) 根据发送结果执⾏本地事务(如果写⼊失败,此时half消息对业务不可见,本地逻辑不执⾏);
(4) 根据本地事务状态执⾏Commit或者Rollback(Commit操作⽣成消息索引,消息对消费者可见) 2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起⼀次“回查”;
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态;
(3) 根据本地事务状态,重新Commit或者Rollback;
其中,补偿阶段⽤于解决消息Commit或者Rollback发⽣超时或者失败的情况;
事务消息⼀共有三种状态:提交状态、回滚状态、中间状态;
TransactionStatus.CommitTransaction: 提交事务,代表消费者可以消费此消息;TransactionStatus.RollbackTransaction: 回滚事务,代表消息将被删除,不能被消费;TransactionStatus.Unknown: 中间状态,代表需要检查消息队列来确定状态;
MQ内部逻辑:
package com.springcloud.stream.stream.Transaction;
import ketmq.spring.annotation.RocketMQTransactionListener;
stc2052import RocketMQLocalTransactionListener;
import RocketMQLocalTransactionState;
import ssaging.Message;
//MQ接收,并根据结果运⾏内部逻辑
@SuppressWarnings("all")
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10) public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 执⾏本地事务:也就是执⾏本地业务逻辑
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Object num = Headers().get("test");
if ("1".equals(num)) {
System.out.println("executer: " + new String((byte[]) Payload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN;
}
else if ("2".equals(num)) {
System.out.println("executer: " + new String((byte[]) Payload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK;
}
System.out.println("executer: " + new String((byte[]) Payload()) + " commit");
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 回调检查
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
System.out.println("check: " + new String((byte[]) Payload()));
return RocketMQLocalTransactionState.COMMIT;
}
}
消息发送:
@Component
public class Sender {
@Autowired
private MySource mySource;
public <T> void sendTransactionalMsg(T msg ,int num) throws Exception{
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader("test",String.valueOf(num));
//.setHeader(RocketMQHeaders.TAGS,"binder");
Message message = builder.build();
mySource.outputTX().send(message);
对旋轴流风机
}
}
⾃定义信道-重写Source
public interface MySource {
String OUTPUTTX = "outputTX";
@Output(MySource.OUTPUTTX)
MessageChannel outputTX();
}
⾃定义信道-重写Sink
public interface MySink {
String INPUTTX = "inputTX";
@Input(MySink.INPUTTX)
SubscribableChannel inputTX();
}
消费者接收消息:
@EnableBinding({MySink.class})
public class ReceiveService {
//spring cloud stream ⾥⾯发消息通过sink发送
@Autowired
private MySink mySink;
//消费者端接收到的消息
@StreamListener("inputTX")
public void receiveTransactionMessage(String receiveMsg) {
System.out.println("Transaction_input 接收到的消息: " + receiveMsg); }
}
Spring Cloud Stream RocketMQ 配置选项RocketMQ Binder Properties
*spring.ketmq.binder.name-server*
RocketMQ NameServer 地址(⽼版本使⽤ namesrv-addr 配置项);Default: 127.0.0.1:9876.
*spring.ketmq.binder.access-key*
阿⾥云账号 AccessKey。
Default: null.
*spring.ketmq.binder.secret-key*
阿⾥云账号 SecretKey。
Default: null.
*spring.able-msg-trace*
是否为 Producer 和 Consumer 开启消息轨迹功能
Default: true.
*spring.ketmq.binder.customized-trace-topic*
mcu解密消息轨迹开启后存储的 topic 名称。
Default: RMQ_SYS_TRACE_TOPIC.
RocketMQ Consumer
Properties