Merge "Bug 2055: Handle shard not initialized resiliently"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index fef7e228737b9cb0ab63db21596c4bba10230065..d831b7c1844006fca5bea575ee20d656c4b616c4 100644 (file)
@@ -34,9 +34,9 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -74,6 +74,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -81,7 +83,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 
 /**
  * A Shard represents a portion of the logical data tree <br/>
@@ -216,6 +217,10 @@ public class Shard extends RaftActor {
 
         if (message instanceof RecoveryFailure){
             LOG.error(((RecoveryFailure) message).cause(), "Recovery failed because of this cause");
+
+            // Even though recovery failed, we still need to finish our recovery, eg send the
+            // ActorInitialized message and start the txCommitTimeoutCheckSchedule.
+            onRecoveryComplete();
         } else {
             super.onReceiveRecover(message);
         }
@@ -383,8 +388,11 @@ public class Shard extends RaftActor {
                 ready.getModification());
 
         // Return our actor path as we'll handle the three phase commit.
-        getSender().tell(new ReadyTransactionReply(Serialization.serializedActorPath(self())).
-                toSerializable(), getSelf());
+        ReadyTransactionReply readyTransactionReply =
+            new ReadyTransactionReply(Serialization.serializedActorPath(self()));
+        getSender().tell(
+            ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
+            getSelf());
     }
 
     private void handleAbortTransaction(AbortTransaction abort) {
@@ -695,12 +703,15 @@ public class Shard extends RaftActor {
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
-        // Schedule a message to be periodically sent to check if the current in-progress
-        // transaction should be expired and aborted.
-        FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
-        txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
-                period, period, getSelf(),
-                TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        // Being paranoid here - this method should only be called once but just in case...
+        if(txCommitTimeoutCheckSchedule == null) {
+            // Schedule a message to be periodically sent to check if the current in-progress
+            // transaction should be expired and aborted.
+            FiniteDuration period = Duration.create(transactionCommitTimeout / 3, TimeUnit.MILLISECONDS);
+            txCommitTimeoutCheckSchedule = getContext().system().scheduler().schedule(
+                    period, period, getSelf(),
+                    TX_COMMIT_TIMEOUT_CHECK_MESSAGE, getContext().dispatcher(), ActorRef.noSender());
+        }
     }
 
     @Override