From: Moiz Raja Date: Fri, 10 Apr 2015 15:20:04 +0000 (+0000) Subject: Merge "Verify BatchedModifications messages sent vs received" X-Git-Tag: release/lithium~288 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ebaf3d71465066033d5882c61cdd2ec63b29d980;hp=3c13136499bf20d1fee22bed16e6a86a37af13cf Merge "Verify BatchedModifications messages sent vs received" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index ed19f21ded..eca2949666 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -19,6 +19,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -26,6 +27,7 @@ import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -34,7 +36,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa /** * A sample actor showing how the RaftActor is to be extended */ -public class ExampleActor extends RaftActor { +public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort { private final Map state = new HashMap(); @@ -192,22 +194,28 @@ public class ExampleActor extends RaftActor { } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; } @Override - protected void appendRecoveredLogEntry(Payload data) { + public void startLogRecoveryBatch(int maxBatchSize) { } @Override - protected void applyCurrentLogRecoveryBatch() { + public void appendRecoveredLogEntry(Payload data) { } @Override - protected void onRecoveryComplete() { + public void applyCurrentLogRecoveryBatch() { } @Override - protected void applyRecoverySnapshot(byte[] snapshot) { + public void onRecoveryComplete() { + } + + @Override + public void applyRecoverySnapshot(byte[] snapshot) { } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c30fe2317..1f1521d797 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -11,15 +11,12 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.japi.Procedure; -import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; -import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.io.Serializable; @@ -27,6 +24,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; @@ -36,7 +34,6 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -119,9 +116,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); - private Stopwatch recoveryTimer; - - private int currentRecoveryBatchCount; + private RaftActorRecoverySupport raftRecovery; private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); @@ -140,12 +135,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior)); } - private void initRecoveryTimer() { - if(recoveryTimer == null) { - recoveryTimer = Stopwatch.createStarted(); - } - } - @Override public void preStart() throws Exception { LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(), @@ -169,134 +158,28 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void handleRecover(Object message) { - if(persistence().isRecoveryApplicable()) { - if (message instanceof SnapshotOffer) { - onRecoveredSnapshot((SnapshotOffer) message); - } else if (message instanceof ReplicatedLogEntry) { - onRecoveredJournalLogEntry((ReplicatedLogEntry) message); - } else if (message instanceof ApplyLogEntries) { - // Handle this message for backwards compatibility with pre-Lithium versions. - onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex()); - } else if (message instanceof ApplyJournalEntries) { - onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); - } else if (message instanceof DeleteEntries) { - replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex()); - } else if (message instanceof UpdateElectionTerm) { - context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), - ((UpdateElectionTerm) message).getVotedFor()); - } else if (message instanceof RecoveryCompleted) { - onRecoveryCompletedMessage(); - } - } else { - if (message instanceof RecoveryCompleted) { + if(raftRecovery == null) { + raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior, + getRaftActorRecoveryCohort()); + } + + boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message); + if(recoveryComplete) { + if(!persistence().isRecoveryApplicable()) { // Delete all the messages from the akka journal so that we do not end up with consistency issues // Note I am not using the dataPersistenceProvider and directly using the akka api here deleteMessages(lastSequenceNr()); // Delete all the akka snapshots as they will not be needed deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue())); - - onRecoveryComplete(); - - initializeBehavior(); } - } - } - - private void onRecoveredSnapshot(SnapshotOffer offer) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: SnapshotOffer called..", persistenceId()); - } - initRecoveryTimer(); + onRecoveryComplete(); - Snapshot snapshot = (Snapshot) offer.snapshot(); + initializeBehavior(); - // Create a replicated log with the snapshot information - // The replicated log can be used later on to retrieve this snapshot - // when we need to install it on a peer - - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider, - currentBehavior)); - context.setLastApplied(snapshot.getLastAppliedIndex()); - context.setCommitIndex(snapshot.getLastAppliedIndex()); - - Stopwatch timer = Stopwatch.createStarted(); - - // Apply the snapshot to the actors state - applyRecoverySnapshot(snapshot.getState()); - - timer.stop(); - LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + - replicatedLog().size(), persistenceId(), timer.toString(), - replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm()); - } - - private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex()); + raftRecovery = null; } - - replicatedLog().append(logEntry); - } - - private void onRecoveredApplyLogEntries(long toIndex) { - if(LOG.isDebugEnabled()) { - LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", - persistenceId(), context.getLastApplied() + 1, toIndex); - } - - for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { - batchRecoveredLogEntry(replicatedLog().get(i)); - } - - context.setLastApplied(toIndex); - context.setCommitIndex(toIndex); - } - - private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { - initRecoveryTimer(); - - int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize(); - if(currentRecoveryBatchCount == 0) { - startLogRecoveryBatch(batchSize); - } - - appendRecoveredLogEntry(logEntry.getData()); - - if(++currentRecoveryBatchCount >= batchSize) { - endCurrentLogRecoveryBatch(); - } - } - - private void endCurrentLogRecoveryBatch() { - applyCurrentLogRecoveryBatch(); - currentRecoveryBatchCount = 0; - } - - private void onRecoveryCompletedMessage() { - if(currentRecoveryBatchCount > 0) { - endCurrentLogRecoveryBatch(); - } - - onRecoveryComplete(); - - String recoveryTime = ""; - if(recoveryTimer != null) { - recoveryTimer.stop(); - recoveryTime = " in " + recoveryTimer.toString(); - recoveryTimer = null; - } - - LOG.info( - "Recovery completed" + recoveryTime + " - Switching actor to Follower - " + - "Persistence Id = " + persistenceId() + - " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " + - "journal-size={}", - replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(), - replicatedLog().getSnapshotTerm(), replicatedLog().size()); - - initializeBehavior(); } protected void initializeBehavior(){ @@ -670,31 +553,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { Object data); /** - * This method is called during recovery at the start of a batch of state entries. Derived - * classes should perform any initialization needed to start a batch. - */ - protected abstract void startLogRecoveryBatch(int maxBatchSize); - - /** - * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startLogRecoveryBatch}. - * - * @param data the state data - */ - protected abstract void appendRecoveredLogEntry(Payload data); - - /** - * This method is called during recovery to reconstruct the state of the actor. - * - * @param snapshotBytes A snapshot of the state of the actor - */ - protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); - - /** - * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveredLogEntry}. + * Returns the RaftActorRecoveryCohort to participate in persistence recovery. */ - protected abstract void applyCurrentLogRecoveryBatch(); + @Nonnull + protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort(); /** * This method is called when recovery is complete. diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java new file mode 100644 index 0000000000..a9f00aa80b --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; + +/** + * Interface for a class that participates in raft actor persistence recovery. + * + * @author Thomas Pantelis + */ +public interface RaftActorRecoveryCohort { + + /** + * This method is called during recovery at the start of a batch of state entries. Derived + * classes should perform any initialization needed to start a batch. + */ + void startLogRecoveryBatch(int maxBatchSize); + + /** + * This method is called during recovery to append state data to the current batch. This method + * is called 1 or more times after {@link #startLogRecoveryBatch}. + * + * @param data the state data + */ + void appendRecoveredLogEntry(Payload data); + + /** + * This method is called during recovery to reconstruct the state of the actor. + * + * @param snapshotBytes A snapshot of the state of the actor + */ + void applyRecoverySnapshot(byte[] snapshotBytes); + + /** + * This method is called during recovery at the end of a batch to apply the current batched + * log entries. This method is called after {@link #appendRecoveredLogEntry}. + */ + void applyCurrentLogRecoveryBatch(); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java new file mode 100644 index 0000000000..5f33c738e1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.persistence.RecoveryCompleted; +import akka.persistence.SnapshotOffer; +import com.google.common.base.Stopwatch; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries; +import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +/** + * Support class that handles persistence recovery for a RaftActor. + * + * @author Thomas Pantelis + */ +class RaftActorRecoverySupport { + private final DataPersistenceProvider persistence; + private final RaftActorContext context; + private final RaftActorBehavior currentBehavior; + private final RaftActorRecoveryCohort cohort; + + private int currentRecoveryBatchCount; + + private Stopwatch recoveryTimer; + private final Logger log; + + RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context, + RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) { + this.persistence = persistence; + this.context = context; + this.currentBehavior = currentBehavior; + this.cohort = cohort; + this.log = context.getLogger(); + } + + boolean handleRecoveryMessage(Object message) { + boolean recoveryComplete = false; + if(persistence.isRecoveryApplicable()) { + if (message instanceof SnapshotOffer) { + onRecoveredSnapshot((SnapshotOffer) message); + } else if (message instanceof ReplicatedLogEntry) { + onRecoveredJournalLogEntry((ReplicatedLogEntry) message); + } else if (message instanceof ApplyLogEntries) { + // Handle this message for backwards compatibility with pre-Lithium versions. + onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex()); + } else if (message instanceof ApplyJournalEntries) { + onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex()); + } else if (message instanceof DeleteEntries) { + replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex()); + } else if (message instanceof UpdateElectionTerm) { + context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(), + ((UpdateElectionTerm) message).getVotedFor()); + } else if (message instanceof RecoveryCompleted) { + onRecoveryCompletedMessage(); + recoveryComplete = true; + } + } else if (message instanceof RecoveryCompleted) { + recoveryComplete = true; + } + + return recoveryComplete; + } + + private ReplicatedLog replicatedLog() { + return context.getReplicatedLog(); + } + + private void initRecoveryTimer() { + if(recoveryTimer == null) { + recoveryTimer = Stopwatch.createStarted(); + } + } + + private void onRecoveredSnapshot(SnapshotOffer offer) { + if(log.isDebugEnabled()) { + log.debug("{}: SnapshotOffer called..", context.getId()); + } + + initRecoveryTimer(); + + Snapshot snapshot = (Snapshot) offer.snapshot(); + + // Create a replicated log with the snapshot information + // The replicated log can be used later on to retrieve this snapshot + // when we need to install it on a peer + + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior)); + context.setLastApplied(snapshot.getLastAppliedIndex()); + context.setCommitIndex(snapshot.getLastAppliedIndex()); + + Stopwatch timer = Stopwatch.createStarted(); + + // Apply the snapshot to the actors state + cohort.applyRecoverySnapshot(snapshot.getState()); + + timer.stop(); + log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" + + replicatedLog().size(), context.getId(), timer.toString(), + replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm()); + } + + private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) { + if(log.isDebugEnabled()) { + log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex()); + } + + replicatedLog().append(logEntry); + } + + private void onRecoveredApplyLogEntries(long toIndex) { + if(log.isDebugEnabled()) { + log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}", + context.getId(), context.getLastApplied() + 1, toIndex); + } + + for (long i = context.getLastApplied() + 1; i <= toIndex; i++) { + batchRecoveredLogEntry(replicatedLog().get(i)); + } + + context.setLastApplied(toIndex); + context.setCommitIndex(toIndex); + } + + private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) { + initRecoveryTimer(); + + int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize(); + if(currentRecoveryBatchCount == 0) { + cohort.startLogRecoveryBatch(batchSize); + } + + cohort.appendRecoveredLogEntry(logEntry.getData()); + + if(++currentRecoveryBatchCount >= batchSize) { + endCurrentLogRecoveryBatch(); + } + } + + private void endCurrentLogRecoveryBatch() { + cohort.applyCurrentLogRecoveryBatch(); + currentRecoveryBatchCount = 0; + } + + private void onRecoveryCompletedMessage() { + if(currentRecoveryBatchCount > 0) { + endCurrentLogRecoveryBatch(); + } + + String recoveryTime = ""; + if(recoveryTimer != null) { + recoveryTimer.stop(); + recoveryTime = " in " + recoveryTimer.toString(); + recoveryTimer = null; + } + + log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " + + "Persistence Id = " + context.getId() + + " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " + + "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(), + replicatedLog().getSnapshotTerm(), replicatedLog().size()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index b910313b09..45fd26c930 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -119,7 +119,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } @Override - protected void applyRecoverySnapshot(byte[] bytes) { + public void applyRecoverySnapshot(byte[] bytes) { } void setSnapshot(byte[] snapshot) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 17a81ac3c3..f71cb984b3 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -97,9 +98,10 @@ public class RaftActorTest extends AbstractActorTest { InMemorySnapshotStore.clear(); } - public static class MockRaftActor extends RaftActor { + public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort { - private final RaftActor delegate; + private final RaftActor actorDelegate; + private final RaftActorRecoveryCohort cohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; @@ -136,7 +138,8 @@ public class RaftActorTest extends AbstractActorTest { DataPersistenceProvider dataPersistenceProvider) { super(id, peerAddresses, config); state = new ArrayList<>(); - this.delegate = mock(RaftActor.class); + this.actorDelegate = mock(RaftActor.class); + this.cohortDelegate = mock(RaftActorRecoveryCohort.class); if(dataPersistenceProvider == null){ setPersistence(true); } else { @@ -197,26 +200,32 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void applyState(ActorRef clientActor, String identifier, Object data) { - delegate.applyState(clientActor, identifier, data); + actorDelegate.applyState(clientActor, identifier, data); LOG.info("{}: applyState called", persistenceId()); } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; } @Override - protected void appendRecoveredLogEntry(Payload data) { + public void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + public void appendRecoveredLogEntry(Payload data) { state.add(data); } @Override - protected void applyCurrentLogRecoveryBatch() { + public void applyCurrentLogRecoveryBatch() { } @Override protected void onRecoveryComplete() { - delegate.onRecoveryComplete(); + actorDelegate.onRecoveryComplete(); recoveryComplete.countDown(); } @@ -227,8 +236,8 @@ public class RaftActorTest extends AbstractActorTest { } @Override - protected void applyRecoverySnapshot(byte[] bytes) { - delegate.applyRecoverySnapshot(bytes); + public void applyRecoverySnapshot(byte[] bytes) { + cohortDelegate.applyRecoverySnapshot(bytes); try { Object data = toObject(bytes); if (data instanceof List) { @@ -241,16 +250,16 @@ public class RaftActorTest extends AbstractActorTest { @Override protected void createSnapshot() { LOG.info("{}: createSnapshot called", persistenceId()); - delegate.createSnapshot(); + actorDelegate.createSnapshot(); } @Override protected void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); - delegate.applySnapshot(snapshot); + actorDelegate.applySnapshot(snapshot); } @Override protected void onStateChanged() { - delegate.onStateChanged(); + actorDelegate.onStateChanged(); } @Override @@ -516,7 +525,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); + verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -583,7 +592,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class)); + verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -810,7 +819,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); - verify(mockRaftActor.delegate).createSnapshot(); + verify(mockRaftActor.actorDelegate).createSnapshot(); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -860,7 +869,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry)); - verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); + verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject()); } }; @@ -907,7 +916,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState())); + verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -1131,7 +1140,7 @@ public class RaftActorTest extends AbstractActorTest { .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4); - verify(leaderActor.delegate).createSnapshot(); + verify(leaderActor.actorDelegate).createSnapshot(); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1230,7 +1239,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("D")), 4); - verify(followerActor.delegate).createSnapshot(); + verify(followerActor.actorDelegate).createSnapshot(); assertEquals(6, followerActor.getReplicatedLog().size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 65b6ac4bd0..b2a8637694 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -63,11 +63,11 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -121,12 +121,6 @@ public class Shard extends RaftActor { private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply( Serialization.serializedActorPath(getSelf())); - - /** - * Coordinates persistence recovery on startup. - */ - private ShardRecoveryCoordinator recoveryCoordinator; - private final DOMTransactionFactory transactionFactory; private final String txnDispatcherPath; @@ -177,8 +171,6 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); - - recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG); } private void setTransactionCommitTimeout() { @@ -654,30 +646,13 @@ public class Shard extends RaftActor { } @Override - protected - void startLogRecoveryBatch(final int maxBatchSize) { - recoveryCoordinator.startLogRecoveryBatch(maxBatchSize); - } - - @Override - protected void appendRecoveredLogEntry(final Payload data) { - recoveryCoordinator.appendRecoveredLogPayload(data); - } - - @Override - protected void applyRecoverySnapshot(final byte[] snapshotBytes) { - recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes); - } - - @Override - protected void applyCurrentLogRecoveryBatch() { - recoveryCoordinator.applyCurrentLogRecoveryBatch(); + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return new ShardRecoveryCoordinator(store, persistenceId(), LOG); } @Override protected void onRecoveryComplete() { - recoveryCoordinator = null; - //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index 7e547d7257..01a124b697 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -13,6 +13,7 @@ import java.util.List; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; @@ -32,7 +33,7 @@ import org.slf4j.Logger; * * @author Thomas Panetelis */ -class ShardRecoveryCoordinator { +class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { private final InMemoryDOMDataStore store; private List currentLogRecoveryBatch; @@ -45,13 +46,15 @@ class ShardRecoveryCoordinator { this.log = log; } - void startLogRecoveryBatch(int maxBatchSize) { + @Override + public void startLogRecoveryBatch(int maxBatchSize) { currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); } - void appendRecoveredLogPayload(Payload payload) { + @Override + public void appendRecoveredLogEntry(Payload payload) { try { if(payload instanceof ModificationPayload) { currentLogRecoveryBatch.add((ModificationPayload) payload); @@ -83,7 +86,8 @@ class ShardRecoveryCoordinator { /** * Applies the current batched log entries to the data store. */ - void applyCurrentLogRecoveryBatch() { + @Override + public void applyCurrentLogRecoveryBatch() { log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction(); @@ -105,7 +109,8 @@ class ShardRecoveryCoordinator { * * @param snapshotBytes the serialized snapshot */ - void applyRecoveredSnapshot(final byte[] snapshotBytes) { + @Override + public void applyRecoverySnapshot(final byte[] snapshotBytes) { log.debug("{}: Applyng recovered sbapshot", shardName); DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();