Merge "BUG 1734 - Applying a recovered journal to Shard does not happen during recovery"
authorTom Pantelis <tpanteli@brocade.com>
Wed, 10 Sep 2014 16:54:59 +0000 (16:54 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 10 Sep 2014 16:54:59 +0000 (16:54 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java

index 8135d837d3ad86563ad0246022941bfc6134b59d..c8cbcca6e8609834500af3c5511273b49a8ca329 100644 (file)
@@ -144,7 +144,13 @@ public abstract class RaftActor extends UntypedPersistentActor {
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
 
         } else if (message instanceof ReplicatedLogEntry) {
-            replicatedLog.append((ReplicatedLogEntry) message);
+            ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message;
+
+            // Apply State immediately
+            replicatedLog.append(logEntry);
+            applyState(null, "recovery", logEntry.getData());
+            context.setLastApplied(logEntry.getIndex());
+            context.setCommitIndex(logEntry.getIndex());
         } else if (message instanceof DeleteEntries) {
             replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex());
         } else if (message instanceof UpdateElectionTerm) {
@@ -152,7 +158,8 @@ public abstract class RaftActor extends UntypedPersistentActor {
         } else if (message instanceof RecoveryCompleted) {
             LOG.debug(
                 "RecoveryCompleted - Switching actor to Follower - " +
-                    "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
+                    "Persistence Id =  " + persistenceId() +
+                    " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " +
                     "journal-size={}",
                 replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
                 replicatedLog.snapshotTerm, replicatedLog.size());
index e6ddd8fa198497cbf1d86f221ecb2ef82b2df5c8..0737d2020bcde75125eafebbfe963fc19f2258f7 100644 (file)
@@ -210,6 +210,9 @@ public class Shard extends RaftActor {
                 createTransaction(CreateTransaction.fromSerializable(message));
             } else if (getLeader() != null) {
                 getLeader().forward(message, getContext());
+            } else {
+                getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+                    "Could not find leader so transaction cannot be created")), getSelf());
             }
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
@@ -326,6 +329,9 @@ public class Shard extends RaftActor {
                 modification);
             DOMStoreWriteTransaction transaction =
                 store.newWriteOnlyTransaction();
+
+            LOG.debug("Created new transaction {}", transaction.getIdentifier().toString());
+
             modification.apply(transaction);
             try {
                 syncCommitTransaction(transaction);
@@ -339,6 +345,12 @@ public class Shard extends RaftActor {
             return;
         }
 
+
+        if(sender == null){
+            LOG.error("Commit failed. Sender cannot be null");
+            return;
+        }
+
         final ListenableFuture<Void> future = cohort.commit();
         final ActorRef self = getSelf();