- try {
- if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
- handleCreateTransaction(message);
- } else if (BatchedModifications.class.isInstance(message)) {
- handleBatchedModifications((BatchedModifications)message);
- } else if (message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message instanceof ReadyLocalTransaction) {
- handleReadyLocalTransaction((ReadyLocalTransaction)message);
- } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
- handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
- handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
- handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
- } else if (message instanceof RegisterDataTreeChangeListener) {
- treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof PeerAddressResolved) {
- PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(),
- resolved.getPeerAddress());
- } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
- } else if(message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
- } else if(message instanceof RegisterRoleChangeListener){
- roleChangeNotifier.get().forward(message, context());
- } else if (message instanceof FollowerInitialSyncUpStatus) {
- shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ store.resetTransactionBatch();
+
+ if (message instanceof RequestEnvelope request) {
+ handleRequestEnvelope(request);
+ } else if (MessageAssembler.isHandledMessage(message)) {
+ handleRequestAssemblerMessage(message);
+ } else if (message instanceof ConnectClientRequest request) {
+ handleConnectClient(request);
+ } else if (message instanceof DataTreeChangedReply) {
+ // Ignore reply
+ } else if (message instanceof RegisterDataTreeChangeListener request) {
+ treeChangeSupport.onMessage(request, isLeader(), hasLeader());
+ } else if (message instanceof UpdateSchemaContext request) {
+ updateSchemaContext(request);
+ } else if (message instanceof PeerAddressResolved resolved) {
+ setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
+ } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+ commitTimeoutCheck();
+ } else if (message instanceof DatastoreContext request) {
+ onDatastoreContext(request);
+ } else if (message instanceof RegisterRoleChangeListener) {
+ roleChangeNotifier.orElseThrow().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus request) {
+ shardMBean.setFollowerInitialSyncStatus(request.isInitialSyncDone());