+ store.resetTransactionBatch();
+
+ if (message instanceof RequestEnvelope request) {
+ handleRequestEnvelope(request);
+ } else if (MessageAssembler.isHandledMessage(message)) {
+ handleRequestAssemblerMessage(message);
+ } else if (message instanceof ConnectClientRequest request) {
+ handleConnectClient(request);
+ } else if (message instanceof DataTreeChangedReply) {
+ // Ignore reply
+ } else if (message instanceof RegisterDataTreeChangeListener request) {
+ treeChangeSupport.onMessage(request, isLeader(), hasLeader());
+ } else if (message instanceof UpdateSchemaContext request) {
+ updateSchemaContext(request);
+ } else if (message instanceof PeerAddressResolved resolved) {
+ setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
+ } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+ commitTimeoutCheck();
+ } else if (message instanceof DatastoreContext request) {
+ onDatastoreContext(request);
+ } else if (message instanceof RegisterRoleChangeListener) {
+ roleChangeNotifier.orElseThrow().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus request) {
+ shardMBean.setFollowerInitialSyncStatus(request.isInitialSyncDone());
+ context().parent().tell(message, self());
+ } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) {
+ sender().tell(getShardMBean(), self());
+ } else if (message instanceof GetShardDataTree) {
+ sender().tell(store.getDataTree(), self());
+ } else if (message instanceof ServerRemoved) {
+ context().parent().forward(message, context());
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand request) {
+ store.processCohortRegistryCommand(getSender(), request);
+ } else if (message instanceof MakeLeaderLocal) {
+ onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
+ } else if (GetKnownClients.INSTANCE.equals(message)) {
+ handleGetKnownClients();
+ } else if (!responseMessageSlicer.handleMessage(message)) {
+ // Ask-based protocol messages
+ if (CreateTransaction.isSerializedType(message)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof BatchedModifications request) {
+ handleBatchedModifications(request);
+ } else if (message instanceof ForwardedReadyTransaction request) {
+ handleForwardedReadyTransaction(request);
+ } else if (message instanceof ReadyLocalTransaction request) {
+ handleReadyLocalTransaction(request);
+ } else if (CanCommitTransaction.isSerializedType(message)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (CommitTransaction.isSerializedType(message)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (AbortTransaction.isSerializedType(message)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (CloseTransactionChain.isSerializedType(message)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+ messageRetrySupport.onTimerMessage(message);
+ } else {
+ super.handleNonRaftCommand(message);
+ }
+ }