+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+ if (success != null) {
+ envelope.sendSuccess(success, ticker().read() - now);
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e, ticker().read() - now);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
+ }
+ } else if (message instanceof ConnectClientRequest) {
+ handleConnectClient((ConnectClientRequest)message);
+ } else if (CreateTransaction.isSerializedType(message)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof BatchedModifications) {
+ handleBatchedModifications((BatchedModifications)message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ } else if (message instanceof ReadyLocalTransaction) {
+ handleReadyLocalTransaction((ReadyLocalTransaction)message);
+ } else if (CanCommitTransaction.isSerializedType(message)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (CommitTransaction.isSerializedType(message)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (AbortTransaction.isSerializedType(message)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (CloseTransactionChain.isSerializedType(message)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ changeSupport.onMessage((RegisterChangeListener) message, isLeader(), hasLeader());
+ } else if (message instanceof RegisterDataTreeChangeListener) {
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader(), hasLeader());
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
+ } else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
+ store.checkForExpiredTransactions(transactionCommitTimeout);
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ } else if (message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
+ } else if (message instanceof RegisterRoleChangeListener) {
+ roleChangeNotifier.get().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
+ shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ context().parent().tell(message, self());
+ } else if (GET_SHARD_MBEAN_MESSAGE.equals(message)) {
+ sender().tell(getShardMBean(), self());
+ } else if (message instanceof GetShardDataTree) {
+ sender().tell(store.getDataTree(), self());
+ } else if (message instanceof ServerRemoved) {
+ context().parent().forward(message, context());
+ } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
+ messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ store.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+ } else if (message instanceof PersistAbortTransactionPayload) {
+ final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+ persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ } else if (message instanceof MakeLeaderLocal) {
+ onMakeLeaderLocal();
+ } else {
+ super.handleNonRaftCommand(message);