From: Tom Pantelis Date: Wed, 10 Sep 2014 16:54:59 +0000 (+0000) Subject: Merge "BUG 1734 - Applying a recovered journal to Shard does not happen during recovery" X-Git-Tag: release/helium~112 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=622a595aed303b6a0bdbd39554abddcb46549f42;hp=93bec87a3187c32ed3a2a684523b66db3b46645a Merge "BUG 1734 - Applying a recovered journal to Shard does not happen during recovery" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 8135d837d3..c8cbcca6e8 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -144,7 +144,13 @@ public abstract class RaftActor extends UntypedPersistentActor { applySnapshot(ByteString.copyFrom(snapshot.getState())); } else if (message instanceof ReplicatedLogEntry) { - replicatedLog.append((ReplicatedLogEntry) message); + ReplicatedLogEntry logEntry = (ReplicatedLogEntry) message; + + // Apply State immediately + replicatedLog.append(logEntry); + applyState(null, "recovery", logEntry.getData()); + context.setLastApplied(logEntry.getIndex()); + context.setCommitIndex(logEntry.getIndex()); } else if (message instanceof DeleteEntries) { replicatedLog.removeFrom(((DeleteEntries) message).getFromIndex()); } else if (message instanceof UpdateElectionTerm) { @@ -152,7 +158,8 @@ public abstract class RaftActor extends UntypedPersistentActor { } else if (message instanceof RecoveryCompleted) { LOG.debug( "RecoveryCompleted - Switching actor to Follower - " + - "Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + + "Persistence Id = " + persistenceId() + + " Last index in log:{}, snapshotIndex={}, snapshotTerm={}, " + "journal-size={}", replicatedLog.lastIndex(), replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, replicatedLog.size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index e6ddd8fa19..0737d2020b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -210,6 +210,9 @@ public class Shard extends RaftActor { createTransaction(CreateTransaction.fromSerializable(message)); } else if (getLeader() != null) { getLeader().forward(message, getContext()); + } else { + getSender().tell(new akka.actor.Status.Failure(new IllegalStateException( + "Could not find leader so transaction cannot be created")), getSelf()); } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; @@ -326,6 +329,9 @@ public class Shard extends RaftActor { modification); DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + LOG.debug("Created new transaction {}", transaction.getIdentifier().toString()); + modification.apply(transaction); try { syncCommitTransaction(transaction); @@ -339,6 +345,12 @@ public class Shard extends RaftActor { return; } + + if(sender == null){ + LOG.error("Commit failed. Sender cannot be null"); + return; + } + final ListenableFuture future = cohort.commit(); final ActorRef self = getSelf();