+
+ try {
+ if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
+ } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof RegisterRoleChangeListener){
+ roleChangeNotifier.get().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus){
+ shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ context().parent().tell(message, self());
+ } else {
+ super.onReceiveCommand(message);
+ }
+ } finally {
+ context.done();
+ }
+ }
+
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+
+ commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+ setTransactionCommitTimeout();
+
+ if(datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+ dataPersistenceProvider = new PersistentDataProvider();
+ } else if(!datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof PersistentDataProvider) {
+ dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ }
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());