From f530a28f27288126ef708cf76f93e169756d2c8e Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Sun, 7 Sep 2014 17:37:42 -0700 Subject: [PATCH] BUG 1734 - Applying a recovered journal to Shard does not happen during recovery This patch applies the state in the journal to the in-memory state immediately on recovery Change-Id: I0b97f146d31a38cd103d16cde97aaf1c6ce97b07 Signed-off-by: Moiz Raja --- .../controller/cluster/raft/RaftActor.java | 11 +++++++++-- .../controller/cluster/datastore/Shard.java | 12 ++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 8135d837d3..c8cbcca6e8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -144,7 +144,13 @@ public abstract class RaftActor extends UntypedPersistentActor { applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { - replicatedLog.append((ReplicatedLogEntry) message); + ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; + + // Apply State immediately + replicatedLog.append(logEntry); + applyState(null, "recovery", logEntry.getData()); + context.setLastApplied(logEntry.getIndex()); + context.setCommitIndex(logEntry.getIndex()); } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); } else if (message instanceof UpdateElectionTerm) { @@ -152,7 +158,8 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof RecoveryCompleted) { LOG.debug( "RecoveryCompleted - Switching actor to Follower - " + - "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + "journal-size={}", replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index e6ddd8fa19..0737d2020b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -210,6 +210,9 @@ public class Shard extends RaftActor { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { getLeader().forward(message, getContext()); + } else { + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( + "Could not find leader so transaction cannot be created")), getSelf()); } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; @@ -326,6 +329,9 @@ public class Shard extends RaftActor { modification); DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); + modification.apply(transaction); try { syncCommitTransaction(transaction); @@ -339,6 +345,12 @@ public class Shard extends RaftActor { return; } + + if(sender == null){ + LOG.error("Commit failed. Sender cannot be null"); + return; + } + final ListenableFuture future = cohort.commit(); final ActorRef self = getSelf(); -- 2.36.6