From 7df9909614131c0870267732277d7aa78501afdc Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Thu, 26 Mar 2015 14:07:48 -0400 Subject: [PATCH] Refactor snapshot message processing to RaftActorSnapshotMessageSupport Refactored the snapshot message handling to a new class RaftActorSnapshotMessageSupport. To handle the callbacks to the RaftActor, the RaftAciorRecoverySupport takes a RaftActorSnapshotCohort interface. The abstract on* methods in RaftActor are now defined in the RaftActorSnapshotCohort interface. The derived RaftActor class implements an abstract method to return a RaftActorSnapshotCohort instance. Shard returns an instance of a new class ShardSnapshotCohort. For createSnapshot, it needs to create a Shard transaction actor so I refactored out a ShardTransactionActorFactory from Shard which is also used for transaction create messages. The ShardTransaction constructor previously took a SchemaContext is no longer used. Rather than storing the SchemaContext in ShardTransactionActorFactory and keeoing it up to date, I removed SchemaContext from ShardTransaction which cascaded changes down to the derived actor classes and unit tests. Change-Id: I45c2e7cd31b07fec10585b8e5e0495b96842d37c Signed-off-by: Tom Pantelis --- .../cluster/example/ExampleActor.java | 14 ++- .../controller/cluster/raft/RaftActor.java | 109 +++------------- .../cluster/raft/RaftActorSnapshotCohort.java | 33 +++++ .../raft/RaftActorSnapshotMessageSupport.java | 118 ++++++++++++++++++ .../AbstractRaftActorIntegrationTest.java | 2 +- .../cluster/raft/RaftActorTest.java | 41 +++--- .../controller/cluster/datastore/Shard.java | 110 ++++------------ .../datastore/ShardReadTransaction.java | 6 +- .../datastore/ShardReadWriteTransaction.java | 6 +- .../datastore/ShardSnapshotCohort.java | 94 ++++++++++++++ .../cluster/datastore/ShardTransaction.java | 27 ++-- .../datastore/ShardTransactionChain.java | 26 ++-- .../datastore/ShardTransactionFactory.java | 50 ++++++++ .../datastore/ShardWriteTransaction.java | 6 +- .../cluster/datastore/ShardTest.java | 14 ++- .../ShardTransactionFailureTest.java | 21 ++-- .../datastore/ShardTransactionTest.java | 5 +- 17 files changed, 415 insertions(+), 267 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index eca2949666..77eaac2cfe 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -36,7 +37,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa /** * A sample actor showing how the RaftActor is to be extended */ -public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort { +public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { private final Map state = new HashMap(); @@ -120,7 +121,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort { } } - @Override protected void createSnapshot() { + @Override + public void createSnapshot(ActorRef actorRef) { ByteString bs = null; try { bs = fromObject(state); @@ -130,7 +132,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort { getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); } - @Override protected void applySnapshot(byte [] snapshot) { + @Override + public void applySnapshot(byte [] snapshot) { state.clear(); try { state.putAll((HashMap) toObject(snapshot)); @@ -218,4 +221,9 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort { @Override public void applyRecoverySnapshot(byte[] snapshot) { } + + @Override + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return this; + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1f1521d797..41a807aa35 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -11,8 +11,6 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.japi.Procedure; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; @@ -34,10 +32,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior; @@ -96,8 +91,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis - private static final String COMMIT_SNAPSHOT = "commit_snapshot"; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** @@ -114,10 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null); - private final Procedure createSnapshotProcedure = new CreateSnapshotProcedure(); - private RaftActorRecoverySupport raftRecovery; + private RaftActorSnapshotMessageSupport snapshotSupport; + private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder(); public RaftActor(String id, Map peerAddresses) { @@ -145,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { @Override public void postStop() { - if(currentBehavior != null) { + if(currentBehavior.getDelegate() != null) { try { currentBehavior.close(); } catch (Exception e) { @@ -192,7 +185,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior()); } - @Override public void handleCommand(Object message) { + @Override + public void handleCommand(Object message) { + if(snapshotSupport == null) { + snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context, + currentBehavior, getRaftActorSnapshotCohort(), self()); + } + + boolean handled = snapshotSupport.handleSnapshotMessage(message); + if(handled) { + return; + } + if (message instanceof ApplyState){ ApplyState applyState = (ApplyState) message; @@ -219,56 +223,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { persistence().persist(applyEntries, NoopProcedure.instance()); - } else if(message instanceof ApplySnapshot ) { - Snapshot snapshot = ((ApplySnapshot) message).getSnapshot(); - - if(LOG.isDebugEnabled()) { - LOG.debug("{}: ApplySnapshot called on Follower Actor " + - "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(), - snapshot.getLastAppliedTerm() - ); - } - - applySnapshot(snapshot.getState()); - - //clears the followers log, sets the snapshot index to ensure adjusted-index works - context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider, - currentBehavior)); - context.setLastApplied(snapshot.getLastAppliedIndex()); - } else if (message instanceof FindLeader) { getSender().tell( new FindLeaderReply(getLeaderAddress()), getSelf() ); - - } else if (message instanceof SaveSnapshotSuccess) { - SaveSnapshotSuccess success = (SaveSnapshotSuccess) message; - LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId()); - - long sequenceNumber = success.metadata().sequenceNr(); - - commitSnapshot(sequenceNumber); - - } else if (message instanceof SaveSnapshotFailure) { - SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message; - - LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:", - persistenceId(), saveSnapshotFailure.cause()); - - context.getSnapshotManager().rollback(); - - } else if (message instanceof CaptureSnapshot) { - LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message); - - context.getSnapshotManager().create(createSnapshotProcedure); - - } else if (message instanceof CaptureSnapshotReply) { - handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); } else if(message instanceof GetOnDemandRaftState) { onGetOnDemandRaftStats(); - } else if (message.equals(COMMIT_SNAPSHOT)) { - commitSnapshot(-1); } else { reusableBehaviorStateHolder.init(getCurrentBehavior()); @@ -504,7 +465,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { // Make saving Snapshot successful // Committing the snapshot here would end up calling commit in the creating state which would // be a state violation. That's why now we send a message to commit the snapshot. - self().tell(COMMIT_SNAPSHOT, self()); + self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self()); } }); } @@ -528,10 +489,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.setPeerAddress(peerId, peerAddress); } - protected void commitSnapshot(long sequenceNumber) { - context.getSnapshotManager().commit(persistence(), sequenceNumber); - } - /** * The applyState method will be called by the RaftActor when some data * needs to be applied to the actor's state @@ -564,24 +521,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * This method will be called by the RaftActor when a snapshot needs to be - * created. The derived actor should respond with its current state. - *

- * During recovery the state that is returned by the derived actor will - * be passed back to it by calling the applySnapshot method - * - * @return The current state of the actor + * Returns the RaftActorSnapshotCohort to participate in persistence recovery. */ - protected abstract void createSnapshot(); - - /** - * This method can be called at any other point during normal - * operations when the derived actor is out of sync with it's peers - * and the only way to bring it in sync is by applying a snapshot - * - * @param snapshotBytes A snapshot of the state of the actor - */ - protected abstract void applySnapshot(byte[] snapshotBytes); + @Nonnull + protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** * This method will be called by the RaftActor when the state of the @@ -615,12 +558,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { return peerAddress; } - private void handleCaptureSnapshotReply(byte[] snapshotBytes) { - LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length); - - context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory()); - } - protected boolean hasFollowers(){ return getRaftActorContext().hasFollowers(); } @@ -657,14 +594,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private class CreateSnapshotProcedure implements Procedure { - - @Override - public void apply(Void aVoid) throws Exception { - createSnapshot(); - } - } - private static class BehaviorStateHolder { private RaftActorBehavior behavior; private String leaderId; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java new file mode 100644 index 0000000000..ad68726371 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; + +/** + * Interface for a class that participates in raft actor snapshotting. + * + * @author Thomas Pantelis + */ +public interface RaftActorSnapshotCohort { + + /** + * This method is called by the RaftActor when a snapshot needs to be + * created. The implementation should send a CaptureSnapshotReply to the given actor. + * + * @param actorRef the actor to which to respond + */ + void createSnapshot(ActorRef actorRef); + + /** + * This method is called to apply a snapshot installed by the leader. + * + * @param snapshotBytes a snapshot of the state of the actor + */ + void applySnapshot(byte[] snapshotBytes); +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java new file mode 100644 index 0000000000..21c8ffa68e --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft; + +import akka.actor.ActorRef; +import akka.japi.Procedure; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; +import org.slf4j.Logger; + +/** + * Handles snapshot related messages for a RaftActor. + * + * @author Thomas Pantelis + */ +class RaftActorSnapshotMessageSupport { + static final String COMMIT_SNAPSHOT = "commit_snapshot"; + + private final DataPersistenceProvider persistence; + private final RaftActorContext context; + private final RaftActorBehavior currentBehavior; + private final RaftActorSnapshotCohort cohort; + private final ActorRef raftActorRef; + private final Logger log; + + private final Procedure createSnapshotProcedure = new Procedure() { + @Override + public void apply(Void notUsed) throws Exception { + cohort.createSnapshot(raftActorRef); + } + }; + + RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context, + RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) { + this.persistence = persistence; + this.context = context; + this.currentBehavior = currentBehavior; + this.cohort = cohort; + this.raftActorRef = raftActorRef; + this.log = context.getLogger(); + } + + boolean handleSnapshotMessage(Object message) { + if(message instanceof ApplySnapshot ) { + onApplySnapshot(((ApplySnapshot) message).getSnapshot()); + return true; + } else if (message instanceof SaveSnapshotSuccess) { + onSaveSnapshotSuccess((SaveSnapshotSuccess) message); + return true; + } else if (message instanceof SaveSnapshotFailure) { + onSaveSnapshotFailure((SaveSnapshotFailure) message); + return true; + } else if (message instanceof CaptureSnapshot) { + onCaptureSnapshot(message); + return true; + } else if (message instanceof CaptureSnapshotReply) { + onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); + return true; + } else if (message.equals(COMMIT_SNAPSHOT)) { + context.getSnapshotManager().commit(persistence, -1); + return true; + } else { + return false; + } + } + + private void onCaptureSnapshotReply(byte[] snapshotBytes) { + log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length); + + context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory()); + } + + private void onCaptureSnapshot(Object message) { + log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message); + + context.getSnapshotManager().create(createSnapshotProcedure); + } + + private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) { + log.error("{}: SaveSnapshotFailure received for snapshot Cause:", + context.getId(), saveSnapshotFailure.cause()); + + context.getSnapshotManager().rollback(); + } + + private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) { + log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId()); + + long sequenceNumber = success.metadata().sequenceNr(); + + context.getSnapshotManager().commit(persistence, sequenceNumber); + } + + private void onApplySnapshot(Snapshot snapshot) { + if(log.isDebugEnabled()) { + log.debug("{}: ApplySnapshot called on Follower Actor " + + "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(), + snapshot.getLastAppliedTerm()); + } + + cohort.applySnapshot(snapshot.getState()); + + //clears the followers log, sets the snapshot index to ensure adjusted-index works + context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, + currentBehavior)); + context.setLastApplied(snapshot.getLastAppliedIndex()); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java index 45fd26c930..3c6c8281fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java @@ -112,7 +112,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest } @Override - protected void createSnapshot() { + public void createSnapshot(ActorRef actorRef) { if(snapshot != null) { getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender()); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index f71cb984b3..9b8ca5c5ca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -98,10 +98,11 @@ public class RaftActorTest extends AbstractActorTest { InMemorySnapshotStore.clear(); } - public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort { + public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { private final RaftActor actorDelegate; - private final RaftActorRecoveryCohort cohortDelegate; + private final RaftActorRecoveryCohort recoveryCohortDelegate; + private final RaftActorSnapshotCohort snapshotCohortDelegate; private final CountDownLatch recoveryComplete = new CountDownLatch(1); private final List state; private ActorRef roleChangeNotifier; @@ -139,7 +140,8 @@ public class RaftActorTest extends AbstractActorTest { super(id, peerAddresses, config); state = new ArrayList<>(); this.actorDelegate = mock(RaftActor.class); - this.cohortDelegate = mock(RaftActorRecoveryCohort.class); + this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class); + this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class); if(dataPersistenceProvider == null){ setPersistence(true); } else { @@ -210,6 +212,11 @@ public class RaftActorTest extends AbstractActorTest { return this; } + @Override + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return this; + } + @Override public void startLogRecoveryBatch(int maxBatchSize) { } @@ -237,7 +244,7 @@ public class RaftActorTest extends AbstractActorTest { @Override public void applyRecoverySnapshot(byte[] bytes) { - cohortDelegate.applyRecoverySnapshot(bytes); + recoveryCohortDelegate.applyRecoverySnapshot(bytes); try { Object data = toObject(bytes); if (data instanceof List) { @@ -248,17 +255,20 @@ public class RaftActorTest extends AbstractActorTest { } } - @Override protected void createSnapshot() { + @Override + public void createSnapshot(ActorRef actorRef) { LOG.info("{}: createSnapshot called", persistenceId()); - actorDelegate.createSnapshot(); + snapshotCohortDelegate.createSnapshot(actorRef); } - @Override protected void applySnapshot(byte [] snapshot) { + @Override + public void applySnapshot(byte [] snapshot) { LOG.info("{}: applySnapshot called", persistenceId()); - actorDelegate.applySnapshot(snapshot); + snapshotCohortDelegate.applySnapshot(snapshot); } - @Override protected void onStateChanged() { + @Override + protected void onStateChanged() { actorDelegate.onStateChanged(); } @@ -293,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest { public ReplicatedLog getReplicatedLog(){ return this.getRaftActorContext().getReplicatedLog(); } - } @@ -525,7 +534,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); + verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray())); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -592,7 +601,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); - verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class)); + verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class)); mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A"))); @@ -819,7 +828,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex); - verify(mockRaftActor.actorDelegate).createSnapshot(); + verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray())); @@ -916,7 +925,7 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot)); - verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState())); + verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState())); assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog()); @@ -1140,7 +1149,7 @@ public class RaftActorTest extends AbstractActorTest { .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6, new MockRaftActorContext.MockPayload("x")), 4); - verify(leaderActor.actorDelegate).createSnapshot(); + verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(8, leaderActor.getReplicatedLog().size()); @@ -1239,7 +1248,7 @@ public class RaftActorTest extends AbstractActorTest { new MockRaftActorContext.MockReplicatedLogEntry(1, 5, new MockRaftActorContext.MockPayload("D")), 4); - verify(followerActor.actorDelegate).createSnapshot(); + verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class)); assertEquals(6, followerActor.getReplicatedLog().size()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 17f1abb92c..cc2905c98a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -45,7 +45,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; @@ -59,22 +58,18 @@ import org.opendaylight.controller.cluster.datastore.modification.ModificationPa import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; -import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -87,8 +82,6 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); - private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; @VisibleForTesting @@ -104,10 +97,6 @@ public class Shard extends RaftActor { private DatastoreContext datastoreContext; - private SchemaContext schemaContext; - - private int createSnapshotTransactionCounter; - private final ShardCommitCoordinator commitCoordinator; private long transactionCommitTimeout; @@ -121,9 +110,11 @@ public class Shard extends RaftActor { private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply( Serialization.serializedActorPath(getSelf())); - private final DOMTransactionFactory transactionFactory; + private final DOMTransactionFactory domTransactionFactory; + + private final ShardTransactionActorFactory transactionActorFactory; - private final String txnDispatcherPath; + private final ShardSnapshotCohort snapshotCohort; private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); @@ -134,9 +125,6 @@ public class Shard extends RaftActor { this.name = name.toString(); this.datastoreContext = datastoreContext; - this.schemaContext = schemaContext; - this.txnDispatcherPath = new Dispatchers(context().system().dispatchers()) - .getDispatcherPath(Dispatchers.DispatcherType.Transaction); setPersistence(datastoreContext.isPersistent()); @@ -158,9 +146,9 @@ public class Shard extends RaftActor { getContext().become(new MeteringBehavior(this)); } - transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); + domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name); - commitCoordinator = new ShardCommitCoordinator(transactionFactory, + commitCoordinator = new ShardCommitCoordinator(domTransactionFactory, TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); @@ -171,6 +159,12 @@ public class Shard extends RaftActor { appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class, getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis()); + + transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext, + new Dispatchers(context().system().dispatchers()).getDispatcherPath( + Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean); + + snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name); } private void setTransactionCommitTimeout() { @@ -550,29 +544,15 @@ public class Shard extends RaftActor { } private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) { - transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); + domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId()); } private ActorRef createTypedTransactionActor(int transactionType, ShardTransactionIdentifier transactionId, String transactionChainId, short clientVersion ) { - DOMStoreTransaction transaction = transactionFactory.newTransaction( - TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(), - transactionChainId); - - return createShardTransaction(transaction, transactionId, clientVersion); - } - - private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId, - short clientVersion){ - return getContext().actorOf( - ShardTransaction.props(transaction, getSelf(), - schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId(), clientVersion) - .withDispatcher(txnDispatcherPath), - transactionId.toString()); - + return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType), + transactionId, transactionChainId, clientVersion); } private void createTransaction(CreateTransaction createTransaction) { @@ -604,18 +584,11 @@ public class Shard extends RaftActor { return transactionActor; } - private void syncCommitTransaction(final DOMStoreWriteTransaction transaction) - throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } - private void commitWithNewTransaction(final Modification modification) { DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction(); modification.apply(tx); try { - syncCommitTransaction(tx); + snapshotCohort.syncCommitTransaction(tx); shardMBean.incrementCommittedTransactionCount(); shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); } catch (InterruptedException | ExecutionException e) { @@ -625,9 +598,7 @@ public class Shard extends RaftActor { } private void updateSchemaContext(final UpdateSchemaContext message) { - this.schemaContext = message.getSchemaContext(); updateSchemaContext(message.getSchemaContext()); - store.onGlobalContextUpdated(message.getSchemaContext()); } @VisibleForTesting @@ -640,6 +611,11 @@ public class Shard extends RaftActor { return config.isMetricCaptureEnabled(); } + @Override + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return snapshotCohort; + } + @Override @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { @@ -702,46 +678,6 @@ public class Shard extends RaftActor { } } - @Override - protected void createSnapshot() { - // Create a transaction actor. We are really going to treat the transaction as a worker - // so that this actor does not get block building the snapshot. THe transaction actor will - // after processing the CreateSnapshot message. - - ActorRef createSnapshotTransaction = createTransaction( - TransactionProxy.TransactionType.READ_ONLY.ordinal(), - "createSnapshot" + ++createSnapshotTransactionCounter, "", - DataStoreVersions.CURRENT_VERSION); - - createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self()); - } - - @VisibleForTesting - @Override - protected void applySnapshot(final byte[] snapshotBytes) { - // Since this will be done only on Recovery or when this actor is a Follower - // we can safely commit everything in here. We not need to worry about event notifications - // as they would have already been disabled on the follower - - LOG.info("{}: Applying snapshot", persistenceId()); - try { - DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); - - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - - // delete everything first - transaction.delete(DATASTORE_ROOT); - - // Add everything from the remote node back - transaction.write(DATASTORE_ROOT, node); - syncCommitTransaction(transaction); - } catch (InterruptedException | ExecutionException e) { - LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e); - } finally { - LOG.info("{}: Done applying snapshot", persistenceId()); - } - } - @Override protected void onStateChanged() { boolean isLeader = isLeader(); @@ -756,7 +692,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } - transactionFactory.closeAllTransactionChains(); + domTransactionFactory.closeAllTransactionChains(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index 2e66ef918e..41ca486eb6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -38,9 +37,8 @@ public class ShardReadTransaction extends ShardTransaction { private final DOMStoreReadTransaction transaction; public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index b394da88e8..2042e95577 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -25,9 +24,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction { private final DOMStoreReadWriteTransaction transaction; public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(transaction, shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java new file mode 100644 index 0000000000..c59085d61c --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import java.util.concurrent.ExecutionException; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; + +/** + * Participates in raft snapshotting on behalf of a Shard actor. + * + * @author Thomas Pantelis + */ +class ShardSnapshotCohort implements RaftActorSnapshotCohort { + + private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build(); + + private int createSnapshotTransactionCounter; + private final ShardTransactionActorFactory transactionActorFactory; + private final InMemoryDOMDataStore store; + private final Logger log; + private final String logId; + + ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store, + Logger log, String logId) { + this.transactionActorFactory = transactionActorFactory; + this.store = store; + this.log = log; + this.logId = logId; + } + + @Override + public void createSnapshot(ActorRef actorRef) { + // Create a transaction actor. We are really going to treat the transaction as a worker + // so that this actor does not get block building the snapshot. THe transaction actor will + // after processing the CreateSnapshot message. + + ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier( + "createSnapshot" + ++createSnapshotTransactionCounter); + + ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction( + TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION); + + createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef); + } + + @Override + public void applySnapshot(byte[] snapshotBytes) { + // Since this will be done only on Recovery or when this actor is a Follower + // we can safely commit everything in here. We not need to worry about event notifications + // as they would have already been disabled on the follower + + log.info("{}: Applying snapshot", logId); + + try { + DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); + + NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + + // delete everything first + transaction.delete(DATASTORE_ROOT); + + // Add everything from the remote node back + transaction.write(DATASTORE_ROOT, node); + syncCommitTransaction(transaction); + } catch (InterruptedException | ExecutionException e) { + log.error("{}: An exception occurred when applying snapshot", logId, e); + } finally { + log.info("{}: Done applying snapshot", logId); + } + + } + + void syncCommitTransaction(final DOMStoreWriteTransaction transaction) + throws ExecutionException, InterruptedException { + DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); + commitCohort.preCommit().get(); + commitCohort.commit().get(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 613b3749e0..066f01b092 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * The ShardTransaction Actor represents a remote transaction @@ -54,25 +53,22 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering protected static final boolean SERIALIZED_REPLY = true; private final ActorRef shardActor; - private final SchemaContext schemaContext; private final ShardStats shardStats; private final String transactionID; private final short clientTxVersion; - protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext, - ShardStats shardStats, String transactionID, short clientTxVersion) { + protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID, + short clientTxVersion) { super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name this.shardActor = shardActor; - this.schemaContext = schemaContext; this.shardStats = shardStats; this.transactionID = transactionID; this.clientTxVersion = clientTxVersion; } public static Props props(DOMStoreTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats, - String transactionID, short txnClientVersion) { - return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext, + DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) { + return Props.create(new ShardTransactionCreator(transaction, shardActor, datastoreContext, shardStats, transactionID, txnClientVersion)); } @@ -86,10 +82,6 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering return transactionID; } - protected SchemaContext getSchemaContext() { - return schemaContext; - } - protected short getClientTxVersion() { return clientTxVersion; } @@ -161,19 +153,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering final DOMStoreTransaction transaction; final ActorRef shardActor; - final SchemaContext schemaContext; final DatastoreContext datastoreContext; final ShardStats shardStats; final String transactionID; final short txnClientVersion; ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, DatastoreContext datastoreContext, - ShardStats shardStats, String transactionID, short txnClientVersion) { + DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) { this.transaction = transaction; this.shardActor = shardActor; this.shardStats = shardStats; - this.schemaContext = schemaContext; this.datastoreContext = datastoreContext; this.transactionID = transactionID; this.txnClientVersion = txnClientVersion; @@ -184,13 +173,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering ShardTransaction tx; if(transaction instanceof DOMStoreReadWriteTransaction) { tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + shardActor, shardStats, transactionID, txnClientVersion); } else if(transaction instanceof DOMStoreReadTransaction) { tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor, - schemaContext, shardStats, transactionID, txnClientVersion); + shardStats, transactionID, txnClientVersion); } else { tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction, - shardActor, schemaContext, shardStats, transactionID, txnClientVersion); + shardActor, shardStats, transactionID, txnClientVersion); } tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java index 8ba613958a..a4c97e8ab9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java @@ -27,14 +27,12 @@ public class ShardTransactionChain extends AbstractUntypedActor { private final DOMStoreTransactionChain chain; private final DatastoreContext datastoreContext; - private final SchemaContext schemaContext; private final ShardStats shardStats; - public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext, - DatastoreContext datastoreContext, ShardStats shardStats) { + public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext, + ShardStats shardStats) { this.chain = chain; this.datastoreContext = datastoreContext; - this.schemaContext = schemaContext; this.shardStats = shardStats; } @@ -61,22 +59,19 @@ public class ShardTransactionChain extends AbstractUntypedActor { TransactionProxy.TransactionType.READ_ONLY.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), - schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId(), + datastoreContext, shardStats, createTransaction.getTransactionId(), createTransaction.getVersion()), transactionName); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), - schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId(), + datastoreContext, shardStats, createTransaction.getTransactionId(), createTransaction.getVersion()), transactionName); } else if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { return getContext().actorOf( ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), - schemaContext, datastoreContext, shardStats, - createTransaction.getTransactionId(), + datastoreContext, shardStats, createTransaction.getTransactionId(), createTransaction.getVersion()), transactionName); } else { throw new IllegalArgumentException ( @@ -94,8 +89,7 @@ public class ShardTransactionChain extends AbstractUntypedActor { public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext, DatastoreContext datastoreContext, ShardStats shardStats) { - return Props.create(new ShardTransactionChainCreator(chain, schemaContext, - datastoreContext, shardStats)); + return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats)); } private static class ShardTransactionChainCreator implements Creator { @@ -103,21 +97,19 @@ public class ShardTransactionChain extends AbstractUntypedActor { final DOMStoreTransactionChain chain; final DatastoreContext datastoreContext; - final SchemaContext schemaContext; final ShardStats shardStats; - ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext, - DatastoreContext datastoreContext, ShardStats shardStats) { + ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext, + ShardStats shardStats) { this.chain = chain; this.datastoreContext = datastoreContext; - this.schemaContext = schemaContext; this.shardStats = shardStats; } @Override public ShardTransactionChain create() throws Exception { - return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats); + return new ShardTransactionChain(chain, datastoreContext, shardStats); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java new file mode 100644 index 0000000000..9637646fc5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; + +/** + * A factory for creating ShardTransaction actors. + * + * @author Thomas Pantelis + */ +class ShardTransactionActorFactory { + + private final DOMTransactionFactory domTransactionFactory; + private final DatastoreContext datastoreContext; + private final String txnDispatcherPath; + private final ShardStats shardMBean; + private final UntypedActorContext actorContext; + private final ActorRef shardActor; + + ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext, + String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) { + this.domTransactionFactory = domTransactionFactory; + this.datastoreContext = datastoreContext; + this.txnDispatcherPath = txnDispatcherPath; + this.shardMBean = shardMBean; + this.actorContext = actorContext; + this.shardActor = shardActor; + } + + ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID, + String transactionChainID, short clientVersion) { + + DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(), + transactionChainID); + + return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean, + transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath), + transactionID.toString()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index d5dcfde803..87da2b0442 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * @author: syedbahm @@ -44,9 +43,8 @@ public class ShardWriteTransaction extends ShardTransaction { private final DOMStoreWriteTransaction transaction; public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, - SchemaContext schemaContext, ShardStats shardStats, String transactionID, - short clientTxVersion) { - super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion); + ShardStats shardStats, String transactionID, short clientTxVersion) { + super(shardActor, shardStats, transactionID, clientTxVersion); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e04c1a5d18..103d18bd1c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -17,6 +17,7 @@ import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.pattern.Patterns; +import akka.persistence.SaveSnapshotSuccess; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Function; @@ -1389,6 +1390,8 @@ public class ShardTest extends AbstractShardTest { @SuppressWarnings("serial") public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{ + final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); + final AtomicReference savedSnapshot = new AtomicReference<>(); class TestPersistentDataProvider extends DelegatingPersistentDataProvider { TestPersistentDataProvider(DataPersistenceProvider delegate) { @@ -1405,8 +1408,6 @@ public class ShardTest extends AbstractShardTest { dataStoreContextBuilder.persistent(persistent); new ShardTestKit(getSystem()) {{ - final AtomicReference latch = new AtomicReference<>(new CountDownLatch(1)); - class TestShard extends Shard { protected TestShard(ShardIdentifier name, Map peerAddresses, @@ -1416,9 +1417,12 @@ public class ShardTest extends AbstractShardTest { } @Override - protected void commitSnapshot(final long sequenceNumber) { - super.commitSnapshot(sequenceNumber); - latch.get().countDown(); + public void handleCommand(Object message) { + super.handleCommand(message); + + if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) { + latch.get().countDown(); + } } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index c3fef611e3..21fb55fcf1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -70,8 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -100,8 +99,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -130,8 +128,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -160,8 +157,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -193,8 +189,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -231,8 +226,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, "testNegativeMergeTransactionReady"); @@ -264,8 +258,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index e63ace3e2c..41ea7aac7a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -97,7 +97,7 @@ public class ShardTransactionTest extends AbstractActorTest { private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name, short version) { Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(), - testSchemaContext, datastoreContext, shardStats, "txn", version); + datastoreContext, shardStats, "txn", version); return getSystem().actorOf(props, name); } @@ -517,8 +517,7 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn", - DataStoreVersions.CURRENT_VERSION); + datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION). -- 2.36.6