- if(message.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
- // This must be for install snapshot. Don't want to open this up and trigger
- // deSerialization
- self()
- .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
- self());
-
- createSnapshotTransaction = null;
- // Send a PoisonPill instead of sending close transaction because we do not really need
- // a response
- getSender().tell(PoisonPill.getInstance(), self());
-
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof ForwardedCommitTransaction) {
- handleForwardedCommit((ForwardedCommitTransaction) message);
- } else if (message.getClass()
- .equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- if (isLeader()) {
- createTransaction(CreateTransaction.fromSerializable(message));
- } else if (getLeader() != null) {
- getLeader().forward(message, getContext());
+ try {
+ if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleCreateTransaction(message);
+ } else if (BatchedModifications.class.isInstance(message)) {
+ handleBatchedModifications((BatchedModifications)message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
+ getSender(), this);
+ } else if (message instanceof ReadyLocalTransaction) {
+ handleReadyLocalTransaction((ReadyLocalTransaction)message);
+ } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ changeSupport.onMessage((RegisterChangeListener) message, isLeader());
+ } else if (message instanceof RegisterDataTreeChangeListener) {
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
+ } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof RegisterRoleChangeListener){
+ roleChangeNotifier.get().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
+ shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ context().parent().tell(message, self());
+ } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
+ sender().tell(getShardMBean(), self());