X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=157a53ed2d1797b662f6fadccd3acf4027ae442f;hp=1f1521d797d45790dc35c747ca9f8af2d4c6dd8d;hb=f9a9cd1ea40d2477ccb16b03c71a87595226595a;hpb=5de57714fa057ac80f930c2fcce2758ca0a5f514 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1f1521d797..157a53ed2d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -11,8 +12,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.japi.Procedure; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -34,10 +33,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; @@ -96,8 +92,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis - private static final String COMMIT_SNAPSHOT = "commit_snapshot"; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** @@ -114,10 +108,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); - private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); - private RaftActorRecoverySupport raftRecovery; + private RaftActorSnapshotMessageSupport snapshotSupport; + private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); public RaftActor(String id, Map peerAddresses) { @@ -130,7 +124,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context = new RaftActorContextImpl(this.getSelf(), this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG), -1, -1, peerAddresses, - (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), LOG); + (configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()), + delegatingPersistenceProvider, LOG); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior)); } @@ -145,7 +140,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void postStop() { - if(currentBehavior != null) { + if(currentBehavior.getDelegate() != null) { try { currentBehavior.close(); } catch (Exception e) { @@ -159,8 +154,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void handleRecover(Object message) { if(raftRecovery == null) { - raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior, - getRaftActorRecoveryCohort()); + raftRecovery = newRaftActorRecoverySupport(); } boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message); @@ -182,6 +176,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + protected RaftActorRecoverySupport newRaftActorRecoverySupport() { + return new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior, + getRaftActorRecoveryCohort()); + } + protected void initializeBehavior(){ changeCurrentBehavior(new Follower(context)); } @@ -192,7 +191,17 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); } - @Override public void handleCommand(Object message) { + @Override + public void handleCommand(Object message) { + if(snapshotSupport == null) { + snapshotSupport = newRaftActorSnapshotMessageSupport(); + } + + boolean handled = snapshotSupport.handleSnapshotMessage(message); + if(handled) { + return; + } + if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -219,56 +228,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { persistence().persist(applyEntries, NoopProcedure.instance()); - } else if(message instanceof ApplySnapshot ) { - Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(), - snapshot.getLastAppliedTerm() - ); - } - - applySnapshot(snapshot.getState()); - - //clears the followers log, sets the snapshot index to ensure adjusted-index works - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider, - currentBehavior)); - context.setLastApplied(snapshot.getLastAppliedIndex()); - } else if (message instanceof FindLeader) { getSender().tell( new FindLeaderReply(getLeaderAddress()), getSelf() ); - - } else if (message instanceof SaveSnapshotSuccess) { - SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; - LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId()); - - long sequenceNumber = success.metadata().sequenceNr(); - - commitSnapshot(sequenceNumber); - - } else if (message instanceof SaveSnapshotFailure) { - SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - - LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", - persistenceId(), saveSnapshotFailure.cause()); - - context.getSnapshotManager().rollback(); - - } else if (message instanceof CaptureSnapshot) { - LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); - - context.getSnapshotManager().create(createSnapshotProcedure); - - } else if (message instanceof CaptureSnapshotReply) { - handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); - } else if (message.equals(COMMIT_SNAPSHOT)) { - commitSnapshot(-1); } else { reusableBehaviorStateHolder.init(getCurrentBehavior()); @@ -278,6 +244,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() { + return new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context, + currentBehavior, getRaftActorSnapshotCohort()); + } + private void onGetOnDemandRaftStats() { // Debugging message to retrieve raft stats. @@ -504,7 +475,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. - self().tell(COMMIT_SNAPSHOT, self()); + self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self()); } }); } @@ -528,10 +499,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setPeerAddress(peerId, peerAddress); } - protected void commitSnapshot(long sequenceNumber) { - context.getSnapshotManager().commit(persistence(), sequenceNumber); - } - /** * The applyState method will be called by the RaftActor when some data * needs to be applied to the actor's state @@ -564,24 +531,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * This method will be called by the RaftActor when a snapshot needs to be - * created. The derived actor should respond with its current state. - *

- * During recovery the state that is returned by the derived actor will - * be passed back to it by calling the applySnapshot method - * - * @return The current state of the actor - */ - protected abstract void createSnapshot(); - - /** - * This method can be called at any other point during normal - * operations when the derived actor is out of sync with it's peers - * and the only way to bring it in sync is by applying a snapshot - * - * @param snapshotBytes A snapshot of the state of the actor + * Returns the RaftActorSnapshotCohort to participate in persistence recovery. */ - protected abstract void applySnapshot(byte[] snapshotBytes); + @Nonnull + protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** * This method will be called by the RaftActor when the state of the @@ -615,16 +568,16 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return peerAddress; } - private void handleCaptureSnapshotReply(byte[] snapshotBytes) { - LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); - - context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory()); - } - protected boolean hasFollowers(){ return getRaftActorContext().hasFollowers(); } + /** + * @deprecated Deprecated in favor of {@link org.opendaylight.controller.cluster.raft.base.messages.DeleteEntriesTest} + * whose type for fromIndex is long instead of int. This class was kept for backwards + * compatibility with Helium. + */ + @Deprecated static class DeleteEntries implements Serializable { private static final long serialVersionUID = 1L; private final int fromIndex; @@ -657,14 +610,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private class CreateSnapshotProcedure implements Procedure { - - @Override - public void apply(Void aVoid) throws Exception { - createSnapshot(); - } - } - private static class BehaviorStateHolder { private RaftActorBehavior behavior; private String leaderId;