Merge "Initial clustering feature"
authorDevin Avery <devin.avery@brocade.com>
Thu, 4 Sep 2014 22:41:44 +0000 (22:41 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 4 Sep 2014 22:41:44 +0000 (22:41 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index d8af74c84c7a0acd9fdaa106873196b49fd4850e,fbb457178f6ffeabae1a23fb82a7e5df510e288a..6a6a181b6c03ac744d02a3e8e815011d2cf99c3f
@@@ -14,6 -14,7 +14,7 @@@ import akka.actor.Props
  import akka.event.Logging;
  import akka.event.LoggingAdapter;
  import akka.japi.Creator;
+ import akka.persistence.RecoveryFailure;
  import akka.serialization.Serialization;
  
  import com.google.common.base.Optional;
@@@ -21,7 -22,7 +22,7 @@@ import com.google.common.base.Precondit
  import com.google.common.util.concurrent.FutureCallback;
  import com.google.common.util.concurrent.Futures;
  import com.google.common.util.concurrent.ListenableFuture;
 -
 +import com.google.protobuf.ByteString;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@@ -144,8 -145,19 +145,19 @@@ public class Shard extends RaftActor 
          return Props.create(new ShardCreator(name, peerAddresses, datastoreContext));
      }
  
+     @Override public void onReceiveRecover(Object message) {
+         LOG.debug("onReceiveRecover: Received message {} from {}", message.getClass().toString(),
+             getSender());
+         if (message instanceof RecoveryFailure){
+             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+         } else {
+             super.onReceiveRecover(message);
+         }
+     }
      @Override public void onReceiveCommand(Object message) {
-         LOG.debug("Received message {} from {}", message.getClass().toString(),
+         LOG.debug("onReceiveCommand: Received message {} from {}", message.getClass().toString(),
              getSender());
  
          if (message.getClass()
              .tell(new CreateTransactionReply(
                      Serialization.serializedActorPath(transactionActor),
                      createTransaction.getTransactionId()).toSerializable(),
-                 getSelf());
+                 getSelf()
+             );
      }
  
      private void commit(final ActorRef sender, Object serialized) {
          Futures.addCallback(future, new FutureCallback<Void>() {
              @Override
              public void onSuccess(Void v) {
-                sender.tell(new CommitTransactionReply().toSerializable(),self);
-                shardMBean.incrementCommittedTransactionCount();
-                shardMBean.setLastCommittedTransactionTime(new Date());
+                 sender.tell(new CommitTransactionReply().toSerializable(), self);
+                 shardMBean.incrementCommittedTransactionCount();
+                 shardMBean.setLastCommittedTransactionTime(new Date());
              }
  
              @Override
                      identifier, clientActor.path().toString());
              }
  
 -
          } else {
              LOG.error("Unknown state received {}", data);
          }
  
      }
  
 -    @Override protected Object createSnapshot() {
 +    @Override protected void createSnapshot() {
          throw new UnsupportedOperationException("createSnapshot");
      }
  
 -    @Override protected void applySnapshot(Object snapshot) {
 +    @Override protected void applySnapshot(ByteString snapshot) {
          throw new UnsupportedOperationException("applySnapshot");
      }