Merge changes I114cbac1,I45c2e7cd
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 15:21:28 +0000 (15:21 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 15:21:29 +0000 (15:21 +0000)
* changes:
  Calculate replicated log data size on recovery
  Refactor snapshot message processing to RaftActorSnapshotMessageSupport

22 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java

index eca2949..77eaac2 100644 (file)
@@ -28,6 +28,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -36,7 +37,7 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
-public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
     private final Map<String, String> state = new HashMap();
 
@@ -120,7 +121,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
         }
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
         ByteString bs = null;
         try {
             bs = fromObject(state);
@@ -130,7 +132,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
 
-    @Override protected void applySnapshot(byte [] snapshot) {
+    @Override
+    public void applySnapshot(byte [] snapshot) {
         state.clear();
         try {
             state.putAll((HashMap) toObject(snapshot));
@@ -218,4 +221,9 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort {
     @Override
     public void applyRecoverySnapshot(byte[] snapshot) {
     }
+
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
+    }
 }
index 1aecc89..c27ff37 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -19,7 +20,7 @@ import java.util.List;
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     // We define this as ArrayList so we can use ensureCapacity.
-    protected ArrayList<ReplicatedLogEntry> journal;
+    private ArrayList<ReplicatedLogEntry> journal;
 
     private long snapshotIndex = -1;
     private long snapshotTerm = -1;
@@ -28,13 +29,17 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
     private long previousSnapshotIndex = -1;
     private long previousSnapshotTerm = -1;
-    protected int dataSize = 0;
+    private int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
         this.journal = new ArrayList<>(unAppliedEntries);
+
+        for(ReplicatedLogEntry entry: journal) {
+            dataSize += entry.size();
+        }
     }
 
     public AbstractReplicatedLogImpl() {
@@ -90,18 +95,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     @Override
-    public void removeFrom(long logEntryIndex) {
+    public long removeFrom(long logEntryIndex) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
             // physical index should be less than list size and >= 0
-            return;
+            return -1;
+        }
+
+        for(int i = adjustedIndex; i < journal.size(); i++) {
+            dataSize -= journal.get(i).size();
         }
+
         journal.subList(adjustedIndex , journal.size()).clear();
+
+        return adjustedIndex;
     }
 
     @Override
     public void append(ReplicatedLogEntry replicatedLogEntry) {
         journal.add(replicatedLogEntry);
+        dataSize += replicatedLogEntry.size();
     }
 
     @Override
@@ -230,4 +243,9 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshotTerm = previousSnapshotTerm;
         previousSnapshotTerm = -1;
     }
+
+    @VisibleForTesting
+    ReplicatedLogEntry getAtPhysicalIndex(int index) {
+        return journal.get(index);
+    }
 }
index 1f1521d..41a807a 100644 (file)
@@ -11,8 +11,6 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.japi.Procedure;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
@@ -34,10 +32,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
@@ -96,8 +91,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
@@ -114,10 +107,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
 
-    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
     private RaftActorRecoverySupport raftRecovery;
 
+    private RaftActorSnapshotMessageSupport snapshotSupport;
+
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
     public RaftActor(String id, Map<String, String> peerAddresses) {
@@ -145,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior != null) {
+        if(currentBehavior.getDelegate() != null) {
             try {
                 currentBehavior.close();
             } catch (Exception e) {
@@ -192,7 +185,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    @Override public void handleCommand(Object message) {
+    @Override
+    public void handleCommand(Object message) {
+        if(snapshotSupport == null) {
+            snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+                    currentBehavior, getRaftActorSnapshotCohort(), self());
+        }
+
+        boolean handled = snapshotSupport.handleSnapshotMessage(message);
+        if(handled) {
+            return;
+        }
+
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -219,56 +223,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             persistence().persist(applyEntries, NoopProcedure.instance());
 
-        } else if(message instanceof ApplySnapshot ) {
-            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
-                    snapshot.getLastAppliedTerm()
-                );
-            }
-
-            applySnapshot(snapshot.getState());
-
-            //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                    currentBehavior));
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-
-        } else if (message instanceof SaveSnapshotSuccess) {
-            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
-            long sequenceNumber = success.metadata().sequenceNr();
-
-            commitSnapshot(sequenceNumber);
-
-        } else if (message instanceof SaveSnapshotFailure) {
-            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
-            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId(), saveSnapshotFailure.cause());
-
-            context.getSnapshotManager().rollback();
-
-        } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
-            context.getSnapshotManager().create(createSnapshotProcedure);
-
-        } else if (message instanceof CaptureSnapshotReply) {
-            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if (message.equals(COMMIT_SNAPSHOT)) {
-            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(getCurrentBehavior());
 
@@ -504,7 +465,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
-                    self().tell(COMMIT_SNAPSHOT, self());
+                    self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
                 }
             });
         }
@@ -528,10 +489,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
-    protected void commitSnapshot(long sequenceNumber) {
-        context.getSnapshotManager().commit(persistence(), sequenceNumber);
-    }
-
     /**
      * The applyState method will be called by the RaftActor when some data
      * needs to be applied to the actor's state
@@ -564,24 +521,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * This method will be called by the RaftActor when a snapshot needs to be
-     * created. The derived actor should respond with its current state.
-     * <p/>
-     * During recovery the state that is returned by the derived actor will
-     * be passed back to it by calling the applySnapshot  method
-     *
-     * @return The current state of the actor
+     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
      */
-    protected abstract void createSnapshot();
-
-    /**
-     * This method can be called at any other point during normal
-     * operations when the derived actor is out of sync with it's peers
-     * and the only way to bring it in sync is by applying a snapshot
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applySnapshot(byte[] snapshotBytes);
+    @Nonnull
+    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -615,12 +558,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
-    }
-
     protected boolean hasFollowers(){
         return getRaftActorContext().hasFollowers();
     }
@@ -657,14 +594,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private class CreateSnapshotProcedure implements Procedure<Void> {
-
-        @Override
-        public void apply(Void aVoid) throws Exception {
-            createSnapshot();
-        }
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
index 5f33c73..d2c14de 100644 (file)
@@ -104,14 +104,15 @@ class RaftActorRecoverySupport {
         cohort.applyRecoverySnapshot(snapshot.getState());
 
         timer.stop();
-        log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog().size(), context.getId(), timer.toString(),
-                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
+        log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
+                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
     }
 
     private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
         if(log.isDebugEnabled()) {
-            log.debug("{}: Received ReplicatedLogEntry for recovery: {}", context.getId(), logEntry.getIndex());
+            log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
+                    logEntry.getIndex(), logEntry.size());
         }
 
         replicatedLog().append(logEntry);
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java
new file mode 100644 (file)
index 0000000..ad68726
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for a class that participates in raft actor snapshotting.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorSnapshotCohort {
+
+    /**
+     * This method is called by the RaftActor when a snapshot needs to be
+     * created. The implementation should send a CaptureSnapshotReply to the given actor.
+     *
+     * @param actorRef the actor to which to respond
+     */
+    void createSnapshot(ActorRef actorRef);
+
+    /**
+     * This method is called to apply a snapshot installed by the leader.
+     *
+     * @param snapshotBytes a snapshot of the state of the actor
+     */
+    void applySnapshot(byte[] snapshotBytes);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
new file mode 100644 (file)
index 0000000..21c8ffa
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Handles snapshot related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorSnapshotMessageSupport {
+    static final String COMMIT_SNAPSHOT = "commit_snapshot";
+
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorSnapshotCohort cohort;
+    private final ActorRef raftActorRef;
+    private final Logger log;
+
+    private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+        @Override
+        public void apply(Void notUsed) throws Exception {
+            cohort.createSnapshot(raftActorRef);
+        }
+    };
+
+    RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.raftActorRef = raftActorRef;
+        this.log = context.getLogger();
+    }
+
+    boolean handleSnapshotMessage(Object message) {
+        if(message instanceof ApplySnapshot ) {
+            onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+            return true;
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+            return true;
+        } else if (message instanceof SaveSnapshotFailure) {
+            onSaveSnapshotFailure((SaveSnapshotFailure) message);
+            return true;
+        } else if (message instanceof CaptureSnapshot) {
+            onCaptureSnapshot(message);
+            return true;
+        } else if (message instanceof CaptureSnapshotReply) {
+            onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+            return true;
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            context.getSnapshotManager().commit(persistence, -1);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void onCaptureSnapshotReply(byte[] snapshotBytes) {
+        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+
+        context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+    }
+
+    private void onCaptureSnapshot(Object message) {
+        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
+
+        context.getSnapshotManager().create(createSnapshotProcedure);
+    }
+
+    private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
+        log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+                context.getId(), saveSnapshotFailure.cause());
+
+        context.getSnapshotManager().rollback();
+    }
+
+    private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
+        log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
+
+        long sequenceNumber = success.metadata().sequenceNr();
+
+        context.getSnapshotManager().commit(persistence, sequenceNumber);
+    }
+
+    private void onApplySnapshot(Snapshot snapshot) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: ApplySnapshot called on Follower Actor " +
+                    "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+        }
+
+        cohort.applySnapshot(snapshot.getState());
+
+        //clears the followers log, sets the snapshot index to ensure adjusted-index works
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
+                currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+    }
+}
index 3e4d727..8388eaf 100644 (file)
@@ -51,8 +51,9 @@ public interface ReplicatedLog {
      * information
      *
      * @param index the index of the log entry
+     * @return the adjusted index of the first log entry removed or -1 if log entry not found.
      */
-    void removeFrom(long index);
+    long removeFrom(long index);
 
 
     /**
index fdb6305..5a77b9a 100644 (file)
@@ -28,10 +28,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
         @Override
         public void apply(DeleteEntries param) {
-            dataSize = 0;
-            for (ReplicatedLogEntry entry : journal) {
-                dataSize += entry.size();
-            }
         }
     };
 
@@ -57,16 +53,11 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     @Override
     public void removeFromAndPersist(long logEntryIndex) {
-        int adjustedIndex = adjustedIndex(logEntryIndex);
-
-        if (adjustedIndex < 0) {
-            return;
-        }
-
         // FIXME: Maybe this should be done after the command is saved
-        journal.subList(adjustedIndex , journal.size()).clear();
-
-        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        long adjustedIndex = removeFrom(logEntryIndex);
+        if(adjustedIndex >= 0) {
+            persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure);
+        }
     }
 
     @Override
@@ -83,7 +74,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         }
 
         // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-        journal.add(replicatedLogEntry);
+        append(replicatedLogEntry);
 
         // When persisting events with persist it is guaranteed that the
         // persistent actor will not receive further commands between the
@@ -96,8 +87,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
                 public void apply(ReplicatedLogEntry evt) throws Exception {
                     int logEntrySize = replicatedLogEntry.size();
 
-                    dataSize += logEntrySize;
-                    long dataSizeForCheck = dataSize;
+                    long dataSizeForCheck = dataSize();
 
                     dataSizeSinceLastSnapshot += logEntrySize;
 
index 45fd26c..3c6c828 100644 (file)
@@ -112,7 +112,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         @Override
-        protected void createSnapshot() {
+        public void createSnapshot(ActorRef actorRef) {
             if(snapshot != null) {
                 getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
             }
index 8fdb7ea..c99f253 100644 (file)
@@ -15,7 +15,6 @@ import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,14 +39,6 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
-    @After
-    public void tearDown() {
-        replicatedLogImpl.journal.clear();
-        replicatedLogImpl.setSnapshotIndex(-1);
-        replicatedLogImpl.setSnapshotTerm(-1);
-        replicatedLogImpl = null;
-    }
-
     @Test
     public void testIndexOperations() {
 
@@ -65,7 +56,7 @@ public class AbstractReplicatedLogImplTest {
         // now create a snapshot of 3 entries, with 1 unapplied entry left in the log
         // It removes the entries which have made it to snapshot
         // and updates the snapshot index and term
-        Map<Long, String> state = takeSnapshot(3);
+        takeSnapshot(3);
 
         // check the values after the snapshot.
         // each index value passed in the test is the logical index (log entry index)
@@ -101,7 +92,7 @@ public class AbstractReplicatedLogImplTest {
         assertEquals(2, replicatedLogImpl.getFrom(6).size());
 
         // take a second snapshot with 5 entries with 0 unapplied entries left in the log
-        state = takeSnapshot(5);
+        takeSnapshot(5);
 
         assertEquals(0, replicatedLogImpl.size());
         assertNull(replicatedLogImpl.last());
@@ -187,19 +178,45 @@ public class AbstractReplicatedLogImplTest {
         assertTrue(replicatedLogImpl.isPresent(5));
     }
 
+    @Test
+    public void testRemoveFrom() {
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2)));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3)));
+
+        assertEquals("dataSize", 9, replicatedLogImpl.dataSize());
+
+        long adjusted = replicatedLogImpl.removeFrom(4);
+        assertEquals("removeFrom - adjusted", 4, adjusted);
+        assertEquals("size", 4, replicatedLogImpl.size());
+        assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+
+        takeSnapshot(1);
+
+        adjusted = replicatedLogImpl.removeFrom(2);
+        assertEquals("removeFrom - adjusted", 1, adjusted);
+        assertEquals("size", 1, replicatedLogImpl.size());
+        assertEquals("dataSize", 1, replicatedLogImpl.dataSize());
+
+        assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0));
+        assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100));
+    }
+
     // create a snapshot for test
     public Map<Long, String> takeSnapshot(final int numEntries) {
         Map<Long, String> map = new HashMap<>(numEntries);
-        List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
-        for (ReplicatedLogEntry entry : entries) {
+
+        long lastIndex = 0;
+        long lastTerm = 0;
+        for(int i = 0; i < numEntries; i++) {
+            ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i);
             map.put(entry.getIndex(), entry.getData().toString());
+            lastIndex = entry.getIndex();
+            lastTerm = entry.getTerm();
         }
 
-        int term = (int) replicatedLogImpl.lastTerm();
-        int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
-        entries.clear();
-        replicatedLogImpl.setSnapshotTerm(term);
-        replicatedLogImpl.setSnapshotIndex(lastIndex);
+        replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm);
+        replicatedLogImpl.snapshotCommit();
 
         return map;
 
@@ -213,15 +230,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public int dataSize() {
-            return -1;
-        }
-
-        public List<ReplicatedLogEntry> getEntriesTill(final int index) {
-            return journal.subList(0, index);
-        }
-
         @Override
         public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
         }
index f71cb98..14bfd1d 100644 (file)
@@ -98,10 +98,11 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort {
+    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
         private final RaftActor actorDelegate;
-        private final RaftActorRecoveryCohort cohortDelegate;
+        private final RaftActorRecoveryCohort recoveryCohortDelegate;
+        private final RaftActorSnapshotCohort snapshotCohortDelegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
@@ -139,7 +140,8 @@ public class RaftActorTest extends AbstractActorTest {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
             this.actorDelegate = mock(RaftActor.class);
-            this.cohortDelegate = mock(RaftActorRecoveryCohort.class);
+            this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+            this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
             if(dataPersistenceProvider == null){
                 setPersistence(true);
             } else {
@@ -210,6 +212,11 @@ public class RaftActorTest extends AbstractActorTest {
             return this;
         }
 
+        @Override
+        protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+            return this;
+        }
+
         @Override
         public void startLogRecoveryBatch(int maxBatchSize) {
         }
@@ -237,7 +244,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override
         public void applyRecoverySnapshot(byte[] bytes) {
-            cohortDelegate.applyRecoverySnapshot(bytes);
+            recoveryCohortDelegate.applyRecoverySnapshot(bytes);
             try {
                 Object data = toObject(bytes);
                 if (data instanceof List) {
@@ -248,17 +255,20 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        @Override protected void createSnapshot() {
+        @Override
+        public void createSnapshot(ActorRef actorRef) {
             LOG.info("{}: createSnapshot called", persistenceId());
-            actorDelegate.createSnapshot();
+            snapshotCohortDelegate.createSnapshot(actorRef);
         }
 
-        @Override protected void applySnapshot(byte [] snapshot) {
+        @Override
+        public void applySnapshot(byte [] snapshot) {
             LOG.info("{}: applySnapshot called", persistenceId());
-            actorDelegate.applySnapshot(snapshot);
+            snapshotCohortDelegate.applySnapshot(snapshot);
         }
 
-        @Override protected void onStateChanged() {
+        @Override
+        protected void onStateChanged() {
             actorDelegate.onStateChanged();
         }
 
@@ -293,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest {
         public ReplicatedLog getReplicatedLog(){
             return this.getRaftActorContext().getReplicatedLog();
         }
-
     }
 
 
@@ -408,11 +417,11 @@ public class RaftActorTest extends AbstractActorTest {
             // add more entries after snapshot is taken
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
-                    new MockRaftActorContext.MockPayload("F"));
+                    new MockRaftActorContext.MockPayload("F", 2));
             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
-                    new MockRaftActorContext.MockPayload("G"));
+                    new MockRaftActorContext.MockPayload("G", 3));
             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
-                    new MockRaftActorContext.MockPayload("H"));
+                    new MockRaftActorContext.MockPayload("H", 4));
             entries.add(entry2);
             entries.add(entry3);
             entries.add(entry4);
@@ -442,6 +451,7 @@ public class RaftActorTest extends AbstractActorTest {
             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
                     context.getReplicatedLog().size());
+            assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
@@ -525,7 +535,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.cohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+                verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -592,7 +602,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.cohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+                verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -819,7 +829,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
-                verify(mockRaftActor.actorDelegate).createSnapshot();
+                verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -916,7 +926,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
 
-                verify(mockRaftActor.actorDelegate).applySnapshot(eq(snapshot.getState()));
+                verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
 
                 assertTrue("The replicatedLog should have changed",
                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
@@ -1140,7 +1150,7 @@ public class RaftActorTest extends AbstractActorTest {
                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                 new MockRaftActorContext.MockPayload("x")), 4);
 
-                verify(leaderActor.actorDelegate).createSnapshot();
+                verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -1239,7 +1249,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                                 new MockRaftActorContext.MockPayload("D")), 4);
 
-                verify(followerActor.actorDelegate).createSnapshot();
+                verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
index b2a8637..81449c5 100644 (file)
@@ -45,7 +45,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@ -59,22 +58,18 @@ import org.opendaylight.controller.cluster.datastore.modification.ModificationPa
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -87,8 +82,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
@@ -104,10 +97,6 @@ public class Shard extends RaftActor {
 
     private DatastoreContext datastoreContext;
 
-    private SchemaContext schemaContext;
-
-    private int createSnapshotTransactionCounter;
-
     private final ShardCommitCoordinator commitCoordinator;
 
     private long transactionCommitTimeout;
@@ -121,9 +110,11 @@ public class Shard extends RaftActor {
     private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
             Serialization.serializedActorPath(getSelf()));
 
-    private final DOMTransactionFactory transactionFactory;
+    private final DOMTransactionFactory domTransactionFactory;
+
+    private final ShardTransactionActorFactory transactionActorFactory;
 
-    private final String txnDispatcherPath;
+    private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
@@ -134,9 +125,6 @@ public class Shard extends RaftActor {
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
-        this.schemaContext = schemaContext;
-        this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
-                .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -158,9 +146,9 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+        domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
 
-        commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+        commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
                 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
@@ -171,6 +159,12 @@ public class Shard extends RaftActor {
 
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+
+        transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+                new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+                        Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+
+        snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
     }
 
     private void setTransactionCommitTimeout() {
@@ -555,29 +549,15 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
             ShardTransactionIdentifier transactionId, String transactionChainId,
             short clientVersion ) {
 
-        DOMStoreTransaction transaction = transactionFactory.newTransaction(
-                TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
-                transactionChainId);
-
-        return createShardTransaction(transaction, transactionId, clientVersion);
-    }
-
-    private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
-                                            short clientVersion){
-        return getContext().actorOf(
-                ShardTransaction.props(transaction, getSelf(),
-                        schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId(), clientVersion)
-                        .withDispatcher(txnDispatcherPath),
-                transactionId.toString());
-
+        return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+                transactionId, transactionChainId, clientVersion);
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -609,18 +589,11 @@ public class Shard extends RaftActor {
         return transactionActor;
     }
 
-    private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
-        throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
-    }
-
     private void commitWithNewTransaction(final Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
         modification.apply(tx);
         try {
-            syncCommitTransaction(tx);
+            snapshotCohort.syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
@@ -630,9 +603,7 @@ public class Shard extends RaftActor {
     }
 
     private void updateSchemaContext(final UpdateSchemaContext message) {
-        this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
-        store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
     @VisibleForTesting
@@ -645,6 +616,11 @@ public class Shard extends RaftActor {
         return config.isMetricCaptureEnabled();
     }
 
+    @Override
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return snapshotCohort;
+    }
+
     @Override
     @Nonnull
     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
@@ -707,46 +683,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override
-    protected void createSnapshot() {
-        // Create a transaction actor. We are really going to treat the transaction as a worker
-        // so that this actor does not get block building the snapshot. THe transaction actor will
-        // after processing the CreateSnapshot message.
-
-        ActorRef createSnapshotTransaction = createTransaction(
-                TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                DataStoreVersions.CURRENT_VERSION);
-
-        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
-    }
-
-    @VisibleForTesting
-    @Override
-    protected void applySnapshot(final byte[] snapshotBytes) {
-        // Since this will be done only on Recovery or when this actor is a Follower
-        // we can safely commit everything in here. We not need to worry about event notifications
-        // as they would have already been disabled on the follower
-
-        LOG.info("{}: Applying snapshot", persistenceId());
-        try {
-            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
-            // delete everything first
-            transaction.delete(DATASTORE_ROOT);
-
-            // Add everything from the remote node back
-            transaction.write(DATASTORE_ROOT, node);
-            syncCommitTransaction(transaction);
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
-        } finally {
-            LOG.info("{}: Done applying snapshot", persistenceId());
-        }
-    }
-
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
@@ -761,7 +697,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            transactionFactory.closeAllTransactionChains();
+            domTransactionFactory.closeAllTransactionChains();
         }
     }
 
index 2e66ef9..41ca486 100644 (file)
@@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -38,9 +37,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index b394da8..2042e95 100644 (file)
@@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -25,9 +24,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
new file mode 100644 (file)
index 0000000..c59085d
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+
+/**
+ * Participates in raft snapshotting on behalf of a Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
+    private int createSnapshotTransactionCounter;
+    private final ShardTransactionActorFactory transactionActorFactory;
+    private final InMemoryDOMDataStore store;
+    private final Logger log;
+    private final String logId;
+
+    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+            Logger log, String logId) {
+        this.transactionActorFactory = transactionActorFactory;
+        this.store = store;
+        this.log = log;
+        this.logId = logId;
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
+        // Create a transaction actor. We are really going to treat the transaction as a worker
+        // so that this actor does not get block building the snapshot. THe transaction actor will
+        // after processing the CreateSnapshot message.
+
+        ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
+                "createSnapshot" + ++createSnapshotTransactionCounter);
+
+        ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
+                TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+
+        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+    }
+
+    @Override
+    public void applySnapshot(byte[] snapshotBytes) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+
+        log.info("{}: Applying snapshot", logId);
+
+        try {
+            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+            // delete everything first
+            transaction.delete(DATASTORE_ROOT);
+
+            // Add everything from the remote node back
+            transaction.write(DATASTORE_ROOT, node);
+            syncCommitTransaction(transaction);
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("{}: An exception occurred when applying snapshot", logId, e);
+        } finally {
+            log.info("{}: Done applying snapshot", logId);
+        }
+
+    }
+
+    void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+            throws ExecutionException, InterruptedException {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+    }
+}
index 613b374..066f01b 100644 (file)
@@ -31,7 +31,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * The ShardTransaction Actor represents a remote transaction
@@ -54,25 +53,22 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     protected static final boolean SERIALIZED_REPLY = true;
 
     private final ActorRef shardActor;
-    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
     private final short clientTxVersion;
 
-    protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID, short clientTxVersion) {
+    protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
+            short clientTxVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
-        this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
         this.clientTxVersion = clientTxVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID, short txnClientVersion) {
-        return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+            DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
+        return Props.create(new ShardTransactionCreator(transaction, shardActor,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
@@ -86,10 +82,6 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return transactionID;
     }
 
-    protected SchemaContext getSchemaContext() {
-        return schemaContext;
-    }
-
     protected short getClientTxVersion() {
         return clientTxVersion;
     }
@@ -161,19 +153,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
 
         final DOMStoreTransaction transaction;
         final ActorRef shardActor;
-        final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
         final short txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
-                SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID, short txnClientVersion) {
+                DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
-            this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
             this.txnClientVersion = txnClientVersion;
@@ -184,13 +173,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardActor, shardStats, transactionID, txnClientVersion);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardStats, transactionID, txnClientVersion);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardActor, shardStats, transactionID, txnClientVersion);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index 8ba6139..a4c97e8 100644 (file)
@@ -27,14 +27,12 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     private final DOMStoreTransactionChain chain;
     private final DatastoreContext datastoreContext;
-    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, ShardStats shardStats) {
+    public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+            ShardStats shardStats) {
         this.chain = chain;
         this.datastoreContext = datastoreContext;
-        this.schemaContext = schemaContext;
         this.shardStats = shardStats;
     }
 
@@ -61,22 +59,19 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
@@ -94,8 +89,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
         DatastoreContext datastoreContext, ShardStats shardStats) {
-        return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
-                datastoreContext, shardStats));
+        return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
     }
 
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
@@ -103,21 +97,19 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
         final DOMStoreTransactionChain chain;
         final DatastoreContext datastoreContext;
-        final SchemaContext schemaContext;
         final ShardStats shardStats;
 
 
-        ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, ShardStats shardStats) {
+        ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+                ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
-            this.schemaContext = schemaContext;
             this.shardStats = shardStats;
         }
 
         @Override
         public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
+            return new ShardTransactionChain(chain, datastoreContext, shardStats);
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java
new file mode 100644 (file)
index 0000000..9637646
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * A factory for creating ShardTransaction actors.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardTransactionActorFactory {
+
+    private final DOMTransactionFactory domTransactionFactory;
+    private final DatastoreContext datastoreContext;
+    private final String txnDispatcherPath;
+    private final ShardStats shardMBean;
+    private final UntypedActorContext actorContext;
+    private final ActorRef shardActor;
+
+    ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+            String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
+        this.domTransactionFactory = domTransactionFactory;
+        this.datastoreContext = datastoreContext;
+        this.txnDispatcherPath = txnDispatcherPath;
+        this.shardMBean = shardMBean;
+        this.actorContext = actorContext;
+        this.shardActor = shardActor;
+    }
+
+    ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
+            String transactionChainID, short clientVersion) {
+
+        DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
+                transactionChainID);
+
+        return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+                transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
+                transactionID.toString());
+    }
+}
index e53e9cb..1d5b1d8 100644 (file)
@@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -46,9 +45,8 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index b3a0430..e3b82df 100644 (file)
@@ -17,6 +17,7 @@ import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -1410,6 +1411,8 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
             TestPersistentDataProvider(DataPersistenceProvider delegate) {
@@ -1426,8 +1429,6 @@ public class ShardTest extends AbstractShardTest {
         dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
             class TestShard extends Shard {
 
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
@@ -1437,9 +1438,12 @@ public class ShardTest extends AbstractShardTest {
                 }
 
                 @Override
-                protected void commitSnapshot(final long sequenceNumber) {
-                    super.commitSnapshot(sequenceNumber);
-                    latch.get().countDown();
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
+
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
+                    }
                 }
 
                 @Override
index c3fef61..21fb55f 100644 (file)
@@ -70,8 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -100,8 +99,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -130,8 +128,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -160,8 +157,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -193,8 +189,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -231,8 +226,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -264,8 +258,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index 7894ab1..c9335f3 100644 (file)
@@ -100,7 +100,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
             short version) {
         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
-                testSchemaContext, datastoreContext, shardStats, "txn", version);
+                datastoreContext, shardStats, "txn", version);
         return getSystem().actorOf(props, name);
     }
 
@@ -611,8 +611,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.