Refactor snapshot message processing to RaftActorSnapshotMessageSupport 66/17266/4
authorTom Pantelis <tpanteli@brocade.com>
Thu, 26 Mar 2015 18:07:48 +0000 (14:07 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 8 Apr 2015 05:12:54 +0000 (01:12 -0400)
Refactored the snapshot message handling to a new class
RaftActorSnapshotMessageSupport.  To handle the callbacks to the
RaftActor, the RaftAciorRecoverySupport takes a RaftActorSnapshotCohort interface.
The abstract on* methods in RaftActor are now defined in the
RaftActorSnapshotCohort interface. The derived RaftActor class
implements an abstract method to return a RaftActorSnapshotCohort
instance.

Shard returns an instance of a new class ShardSnapshotCohort. For
createSnapshot, it needs to create a Shard transaction actor so I
refactored out a ShardTransactionActorFactory from Shard which is
also used for transaction create messages.

The ShardTransaction constructor previously took a SchemaContext is no
longer used. Rather than storing the SchemaContext in
ShardTransactionActorFactory and keeoing it up to date, I removed
SchemaContext from ShardTransaction which cascaded changes down to the
derived actor classes and unit tests.

Change-Id: I45c2e7cd31b07fec10585b8e5e0495b96842d37c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index eca29496662f724d8415c9a3196da2ade6fa7af1..77eaac2cfef076fd84959d0e9007fe6233413c9d 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -36,7 +37,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
-public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
     private final Map<String, String> state = new HashMap();
 
@@ -120,7 +121,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
         }
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
         ByteString bs = null;
         try {
             bs = fromObject(state);
@@ -130,7 +132,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
 
-    @Override protected void applySnapshot(byte [] snapshot) {
+    @Override
+    public void applySnapshot(byte [] snapshot) {
         state.clear();
         try {
             state.putAll((HashMap) toObject(snapshot));
@@ -218,4 +221,9 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
     @Override
     public void applyRecoverySnapshot(byte[] snapshot) {
     }
+
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
+    }
 }
index 1f1521d797d45790dc35c747ca9f8af2d4c6dd8d..41a807aa355d394f659f488209f65e732646b120 100644 (file)
@@ -11,8 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.japi.Procedure;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -34,10 +32,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
@@ -96,8 +91,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
@@ -114,10 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
 
-    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
     private RaftActorRecoverySupport raftRecovery;
 
+    private RaftActorSnapshotMessageSupport snapshotSupport;
+
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
@@ -145,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior != null) {
+        if(currentBehavior.getDelegate() != null) {
             try {
                 currentBehavior.close();
             } catch (Exception e) {
@@ -192,7 +185,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    @Override public void handleCommand(Object message) {
+    @Override
+    public void handleCommand(Object message) {
+        if(snapshotSupport == null) {
+            snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+                    currentBehavior, getRaftActorSnapshotCohort(), self());
+        }
+
+        boolean handled = snapshotSupport.handleSnapshotMessage(message);
+        if(handled) {
+            return;
+        }
+
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -219,56 +223,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             persistence().persist(applyEntries, NoopProcedure.instance());
 
-        } else if(message instanceof ApplySnapshot ) {
-            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
-                    snapshot.getLastAppliedTerm()
-                );
-            }
-
-            applySnapshot(snapshot.getState());
-
-            //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                    currentBehavior));
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-
-        } else if (message instanceof SaveSnapshotSuccess) {
-            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
-            long sequenceNumber = success.metadata().sequenceNr();
-
-            commitSnapshot(sequenceNumber);
-
-        } else if (message instanceof SaveSnapshotFailure) {
-            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
-            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId(), saveSnapshotFailure.cause());
-
-            context.getSnapshotManager().rollback();
-
-        } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
-            context.getSnapshotManager().create(createSnapshotProcedure);
-
-        } else if (message instanceof CaptureSnapshotReply) {
-            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if (message.equals(COMMIT_SNAPSHOT)) {
-            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(getCurrentBehavior());
 
@@ -504,7 +465,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
-                    self().tell(COMMIT_SNAPSHOT, self());
+                    self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
                 }
             });
         }
@@ -528,10 +489,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
-    protected void commitSnapshot(long sequenceNumber) {
-        context.getSnapshotManager().commit(persistence(), sequenceNumber);
-    }
-
     /**
      * The applyState method will be called by the RaftActor when some data
      * needs to be applied to the actor's state
@@ -564,24 +521,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * This method will be called by the RaftActor when a snapshot needs to be
-     * created. The derived actor should respond with its current state.
-     * <p/>
-     * During recovery the state that is returned by the derived actor will
-     * be passed back to it by calling the applySnapshot  method
-     *
-     * @return The current state of the actor
+     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
      */
-    protected abstract void createSnapshot();
-
-    /**
-     * This method can be called at any other point during normal
-     * operations when the derived actor is out of sync with it's peers
-     * and the only way to bring it in sync is by applying a snapshot
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applySnapshot(byte[] snapshotBytes);
+    @Nonnull
+    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -615,12 +558,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
-    }
-
     protected boolean hasFollowers(){
         return getRaftActorContext().hasFollowers();
     }
@@ -657,14 +594,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private class CreateSnapshotProcedure implements Procedure<Void> {
-
-        @Override
-        public void apply(Void aVoid) throws Exception {
-            createSnapshot();
-        }
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java
new file mode 100644 (file)
index 0000000..ad68726
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for a class that participates in raft actor snapshotting.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorSnapshotCohort {
+
+    /**
+     * This method is called by the RaftActor when a snapshot needs to be
+     * created. The implementation should send a CaptureSnapshotReply to the given actor.
+     *
+     * @param actorRef the actor to which to respond
+     */
+    void createSnapshot(ActorRef actorRef);
+
+    /**
+     * This method is called to apply a snapshot installed by the leader.
+     *
+     * @param snapshotBytes a snapshot of the state of the actor
+     */
+    void applySnapshot(byte[] snapshotBytes);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
new file mode 100644 (file)
index 0000000..21c8ffa
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Handles snapshot related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorSnapshotMessageSupport {
+    static final String COMMIT_SNAPSHOT = "commit_snapshot";
+
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorSnapshotCohort cohort;
+    private final ActorRef raftActorRef;
+    private final Logger log;
+
+    private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+        @Override
+        public void apply(Void notUsed) throws Exception {
+            cohort.createSnapshot(raftActorRef);
+        }
+    };
+
+    RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.raftActorRef = raftActorRef;
+        this.log = context.getLogger();
+    }
+
+    boolean handleSnapshotMessage(Object message) {
+        if(message instanceof ApplySnapshot ) {
+            onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+            return true;
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+            return true;
+        } else if (message instanceof SaveSnapshotFailure) {
+            onSaveSnapshotFailure((SaveSnapshotFailure) message);
+            return true;
+        } else if (message instanceof CaptureSnapshot) {
+            onCaptureSnapshot(message);
+            return true;
+        } else if (message instanceof CaptureSnapshotReply) {
+            onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+            return true;
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            context.getSnapshotManager().commit(persistence, -1);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void onCaptureSnapshotReply(byte[] snapshotBytes) {
+        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+
+        context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+    }
+
+    private void onCaptureSnapshot(Object message) {
+        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
+
+        context.getSnapshotManager().create(createSnapshotProcedure);
+    }
+
+    private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
+        log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+                context.getId(), saveSnapshotFailure.cause());
+
+        context.getSnapshotManager().rollback();
+    }
+
+    private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
+        log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
+
+        long sequenceNumber = success.metadata().sequenceNr();
+
+        context.getSnapshotManager().commit(persistence, sequenceNumber);
+    }
+
+    private void onApplySnapshot(Snapshot snapshot) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: ApplySnapshot called on Follower Actor " +
+                    "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+        }
+
+        cohort.applySnapshot(snapshot.getState());
+
+        //clears the followers log, sets the snapshot index to ensure adjusted-index works
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
+                currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+    }
+}
index 45fd26c930736dd0d17ff706154b6252246bdd0e..3c6c8281fb734b0d378963c831c2f462fbcc3182 100644 (file)
@@ -112,7 +112,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         @Override
-        protected void createSnapshot() {
+        public void createSnapshot(ActorRef actorRef) {
             if(snapshot != null) {
                 getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
             }
index f71cb984b3e414bf879669cc64535e2d3d64c78c..9b8ca5c5cad988236d5cccc1745d2f94a3e816ef 100644 (file)
@@ -98,10 +98,11 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
+    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
         private final RaftActor actorDelegate;
-        private final RaftActorRecoveryCohort cohortDelegate;
+        private final RaftActorRecoveryCohort recoveryCohortDelegate;
+        private final RaftActorSnapshotCohort snapshotCohortDelegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
@@ -139,7 +140,8 @@ public class RaftActorTest extends AbstractActorTest {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
             this.actorDelegate = mock(RaftActor.class);
-            this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
+            this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+            this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
             if(dataPersistenceProvider == null){
                 setPersistence(true);
             } else {
@@ -210,6 +212,11 @@ public class RaftActorTest extends AbstractActorTest {
             return this;
         }
 
+        @Override
+        protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+            return this;
+        }
+
         @Override
         public void startLogRecoveryBatch(int maxBatchSize) {
         }
@@ -237,7 +244,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override
         public void applyRecoverySnapshot(byte[] bytes) {
-            cohortDelegate.applyRecoverySnapshot(bytes);
+            recoveryCohortDelegate.applyRecoverySnapshot(bytes);
             try {
                 Object data = toObject(bytes);
                 if (data instanceof List) {
@@ -248,17 +255,20 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        @Override protected void createSnapshot() {
+        @Override
+        public void createSnapshot(ActorRef actorRef) {
             LOG.info("{}: createSnapshot called", persistenceId());
-            actorDelegate.createSnapshot();
+            snapshotCohortDelegate.createSnapshot(actorRef);
         }
 
-        @Override protected void applySnapshot(byte [] snapshot) {
+        @Override
+        public void applySnapshot(byte [] snapshot) {
             LOG.info("{}: applySnapshot called", persistenceId());
-            actorDelegate.applySnapshot(snapshot);
+            snapshotCohortDelegate.applySnapshot(snapshot);
         }
 
-        @Override protected void onStateChanged() {
+        @Override
+        protected void onStateChanged() {
             actorDelegate.onStateChanged();
         }
 
@@ -293,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest {
         public ReplicatedLog getReplicatedLog(){
             return this.getRaftActorContext().getReplicatedLog();
         }
-
     }
 
 
@@ -525,7 +534,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+                verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -592,7 +601,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+                verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -819,7 +828,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
-                verify(mockRaftActor.actorDelegate).createSnapshot();
+                verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -916,7 +925,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
 
-                verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
+                verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
 
                 assertTrue("The replicatedLog should have changed",
                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
@@ -1140,7 +1149,7 @@ public class RaftActorTest extends AbstractActorTest {
                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                 new MockRaftActorContext.MockPayload("x")), 4);
 
-                verify(leaderActor.actorDelegate).createSnapshot();
+                verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -1239,7 +1248,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                                 new MockRaftActorContext.MockPayload("D")), 4);
 
-                verify(followerActor.actorDelegate).createSnapshot();
+                verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
index 17f1abb92cc4ec7d4d0a7dffb818cec20b5cff0b..cc2905c98a2d817ce5dd9b7401cec11998f52231 100644 (file)
@@ -45,7 +45,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@ -59,22 +58,18 @@ import org.opendaylight.controller.cluster.datastore.modification.ModificationPa
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -87,8 +82,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
@@ -104,10 +97,6 @@ public class Shard extends RaftActor {
 
     private DatastoreContext datastoreContext;
 
-    private SchemaContext schemaContext;
-
-    private int createSnapshotTransactionCounter;
-
     private final ShardCommitCoordinator commitCoordinator;
 
     private long transactionCommitTimeout;
@@ -121,9 +110,11 @@ public class Shard extends RaftActor {
     private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
             Serialization.serializedActorPath(getSelf()));
 
-    private final DOMTransactionFactory transactionFactory;
+    private final DOMTransactionFactory domTransactionFactory;
+
+    private final ShardTransactionActorFactory transactionActorFactory;
 
-    private final String txnDispatcherPath;
+    private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
@@ -134,9 +125,6 @@ public class Shard extends RaftActor {
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
-        this.schemaContext = schemaContext;
-        this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
-                .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -158,9 +146,9 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+        domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
 
-        commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+        commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
                 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
@@ -171,6 +159,12 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+        transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+                new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+                        Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+
+        snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
     }
 
     private void setTransactionCommitTimeout() {
@@ -550,29 +544,15 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
             ShardTransactionIdentifier transactionId, String transactionChainId,
             short clientVersion ) {
 
-        DOMStoreTransaction transaction = transactionFactory.newTransaction(
-                TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
-                transactionChainId);
-
-        return createShardTransaction(transaction, transactionId, clientVersion);
-    }
-
-    private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
-                                            short clientVersion){
-        return getContext().actorOf(
-                ShardTransaction.props(transaction, getSelf(),
-                        schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId(), clientVersion)
-                        .withDispatcher(txnDispatcherPath),
-                transactionId.toString());
-
+        return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+                transactionId, transactionChainId, clientVersion);
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -604,18 +584,11 @@ public class Shard extends RaftActor {
         return transactionActor;
     }
 
-    private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
-        throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
-    }
-
     private void commitWithNewTransaction(final Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
         modification.apply(tx);
         try {
-            syncCommitTransaction(tx);
+            snapshotCohort.syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
@@ -625,9 +598,7 @@ public class Shard extends RaftActor {
     }
 
     private void updateSchemaContext(final UpdateSchemaContext message) {
-        this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
-        store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
     @VisibleForTesting
@@ -640,6 +611,11 @@ public class Shard extends RaftActor {
         return config.isMetricCaptureEnabled();
     }
 
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return snapshotCohort;
+    }
+
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
@@ -702,46 +678,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override
-    protected void createSnapshot() {
-        // Create a transaction actor. We are really going to treat the transaction as a worker
-        // so that this actor does not get block building the snapshot. THe transaction actor will
-        // after processing the CreateSnapshot message.
-
-        ActorRef createSnapshotTransaction = createTransaction(
-                TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                DataStoreVersions.CURRENT_VERSION);
-
-        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
-    }
-
-    @VisibleForTesting
-    @Override
-    protected void applySnapshot(final byte[] snapshotBytes) {
-        // Since this will be done only on Recovery or when this actor is a Follower
-        // we can safely commit everything in here. We not need to worry about event notifications
-        // as they would have already been disabled on the follower
-
-        LOG.info("{}: Applying snapshot", persistenceId());
-        try {
-            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
-            // delete everything first
-            transaction.delete(DATASTORE_ROOT);
-
-            // Add everything from the remote node back
-            transaction.write(DATASTORE_ROOT, node);
-            syncCommitTransaction(transaction);
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
-        } finally {
-            LOG.info("{}: Done applying snapshot", persistenceId());
-        }
-    }
-
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
@@ -756,7 +692,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            transactionFactory.closeAllTransactionChains();
+            domTransactionFactory.closeAllTransactionChains();
         }
     }
 
index 2e66ef918e6e7541592449e9b51405610f4826a6..41ca486eb6cf9c15756b8e5748f75358fb61c409 100644 (file)
@@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -38,9 +37,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index b394da88e853f4387fb2cbc70ff3b93ab5436999..2042e955777f6668a831889e5ef4985f31505f12 100644 (file)
@@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -25,9 +24,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
new file mode 100644 (file)
index 0000000..c59085d
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+
+/**
+ * Participates in raft snapshotting on behalf of a Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
+    private int createSnapshotTransactionCounter;
+    private final ShardTransactionActorFactory transactionActorFactory;
+    private final InMemoryDOMDataStore store;
+    private final Logger log;
+    private final String logId;
+
+    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+            Logger log, String logId) {
+        this.transactionActorFactory = transactionActorFactory;
+        this.store = store;
+        this.log = log;
+        this.logId = logId;
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
+        // Create a transaction actor. We are really going to treat the transaction as a worker
+        // so that this actor does not get block building the snapshot. THe transaction actor will
+        // after processing the CreateSnapshot message.
+
+        ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
+                "createSnapshot" + ++createSnapshotTransactionCounter);
+
+        ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
+                TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+
+        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+    }
+
+    @Override
+    public void applySnapshot(byte[] snapshotBytes) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+
+        log.info("{}: Applying snapshot", logId);
+
+        try {
+            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+            // delete everything first
+            transaction.delete(DATASTORE_ROOT);
+
+            // Add everything from the remote node back
+            transaction.write(DATASTORE_ROOT, node);
+            syncCommitTransaction(transaction);
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("{}: An exception occurred when applying snapshot", logId, e);
+        } finally {
+            log.info("{}: Done applying snapshot", logId);
+        }
+
+    }
+
+    void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+            throws ExecutionException, InterruptedException {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+    }
+}
index 613b3749e086abc8cdea38fd322872f656235a91..066f01b092d701b08556c5fd7319db4d64a6859c 100644 (file)
@@ -31,7 +31,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * The ShardTransaction Actor represents a remote transaction
@@ -54,25 +53,22 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     protected static final boolean SERIALIZED_REPLY = true;
 
     private final ActorRef shardActor;
-    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
     private final short clientTxVersion;
 
-    protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID, short clientTxVersion) {
+    protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
+            short clientTxVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
-        this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
         this.clientTxVersion = clientTxVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID, short txnClientVersion) {
-        return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+            DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
+        return Props.create(new ShardTransactionCreator(transaction, shardActor,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
@@ -86,10 +82,6 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return transactionID;
     }
 
-    protected SchemaContext getSchemaContext() {
-        return schemaContext;
-    }
-
     protected short getClientTxVersion() {
         return clientTxVersion;
     }
@@ -161,19 +153,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
 
         final DOMStoreTransaction transaction;
         final ActorRef shardActor;
-        final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
         final short txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
-                SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID, short txnClientVersion) {
+                DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
-            this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
             this.txnClientVersion = txnClientVersion;
@@ -184,13 +173,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardActor, shardStats, transactionID, txnClientVersion);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardStats, transactionID, txnClientVersion);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardActor, shardStats, transactionID, txnClientVersion);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index 8ba613958a9a5dfc1e0e2a9b45b921c3b6243b4e..a4c97e8ab9248cd471ad23f50913abf8032a01d3 100644 (file)
@@ -27,14 +27,12 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     private final DOMStoreTransactionChain chain;
     private final DatastoreContext datastoreContext;
-    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, ShardStats shardStats) {
+    public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+            ShardStats shardStats) {
         this.chain = chain;
         this.datastoreContext = datastoreContext;
-        this.schemaContext = schemaContext;
         this.shardStats = shardStats;
     }
 
@@ -61,22 +59,19 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
@@ -94,8 +89,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
         DatastoreContext datastoreContext, ShardStats shardStats) {
-        return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
-                datastoreContext, shardStats));
+        return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
     }
 
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
@@ -103,21 +97,19 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
         final DOMStoreTransactionChain chain;
         final DatastoreContext datastoreContext;
-        final SchemaContext schemaContext;
         final ShardStats shardStats;
 
 
-        ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, ShardStats shardStats) {
+        ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+                ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
-            this.schemaContext = schemaContext;
             this.shardStats = shardStats;
         }
 
         @Override
         public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
+            return new ShardTransactionChain(chain, datastoreContext, shardStats);
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java
new file mode 100644 (file)
index 0000000..9637646
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * A factory for creating ShardTransaction actors.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardTransactionActorFactory {
+
+    private final DOMTransactionFactory domTransactionFactory;
+    private final DatastoreContext datastoreContext;
+    private final String txnDispatcherPath;
+    private final ShardStats shardMBean;
+    private final UntypedActorContext actorContext;
+    private final ActorRef shardActor;
+
+    ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+            String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
+        this.domTransactionFactory = domTransactionFactory;
+        this.datastoreContext = datastoreContext;
+        this.txnDispatcherPath = txnDispatcherPath;
+        this.shardMBean = shardMBean;
+        this.actorContext = actorContext;
+        this.shardActor = shardActor;
+    }
+
+    ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
+            String transactionChainID, short clientVersion) {
+
+        DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
+                transactionChainID);
+
+        return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+                transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
+                transactionID.toString());
+    }
+}
index d5dcfde803a16bfcc6ea285473167d9b78e8c5cb..87da2b04425a28ae4be300e5ec9a2595b7987185 100644 (file)
@@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -44,9 +43,8 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index e04c1a5d185ffbb5bc81b4d035a07545c02ef3a7..103d18bd1cc81defaa8d0a3c5ac406dc11a56ce8 100644 (file)
@@ -17,6 +17,7 @@ import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -1389,6 +1390,8 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
             TestPersistentDataProvider(DataPersistenceProvider delegate) {
@@ -1405,8 +1408,6 @@ public class ShardTest extends AbstractShardTest {
         dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
             class TestShard extends Shard {
 
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
@@ -1416,9 +1417,12 @@ public class ShardTest extends AbstractShardTest {
                 }
 
                 @Override
-                protected void commitSnapshot(final long sequenceNumber) {
-                    super.commitSnapshot(sequenceNumber);
-                    latch.get().countDown();
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
+
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
+                    }
                 }
 
                 @Override
index c3fef611e348a2a446ef2bfff87024425063f3ba..21fb55fcf138bd8e10cd54b2488079770e055e63 100644 (file)
@@ -70,8 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -100,8 +99,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -130,8 +128,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -160,8 +157,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -193,8 +189,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -231,8 +226,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -264,8 +258,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index e63ace3e2cc5abaff32acc4012f583cfbbdde6cc..41ea7aac7a0456ddd5e709824f637677b8e11ed9 100644 (file)
@@ -97,7 +97,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
             short version) {
         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
-                testSchemaContext, datastoreContext, shardStats, "txn", version);
+                datastoreContext, shardStats, "txn", version);
         return getSystem().actorOf(props, name);
     }
 
@@ -517,8 +517,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).