Merge "CDS: Split out TransactionFutureCallback"
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 16:13:01 +0000 (16:13 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 16:13:02 +0000 (16:13 +0000)
49 files changed:
karaf/opendaylight-karaf-resources/src/main/resources/etc/custom.properties
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/AbstractDataServiceTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/ConcurrentImplicitCreateTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/md/sal/binding/data/WildcardedDataChangeListenerTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/DeleteNestedAugmentationListenParentTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/WriteParentListenAugmentTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/WriteParentReadChildTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/BrokerIntegrationTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/CrossBrokerMountPointTest.java
opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/connect/dom/DOMRpcServiceTestBugfix560.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/AbstractConfig.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/common/actor/CommonConfig.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/serialization/NormalizedNodeSerializer.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongFuture.java

index 4a8f5ae795f7541d38793498f1570227febfd979..e726b800d25778bae39aca3de1ef43f067411de7 100644 (file)
@@ -132,8 +132,14 @@ java.util.logging.config.file=configuration/tomcat-logging.properties
 hosttracker.keyscheme=IP
 
 # LISP Flow Mapping configuration
-# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings
+# Map-Register messages overwrite existing RLOC sets in EID-to-RLOC mappings (default: true)
 lisp.mappingOverwrite = true
-# Enable the Solicit-Map-Request (SMR) mechanism
-lisp.smr = false
+# Enable the Solicit-Map-Request (SMR) mechanism (default: true)
+lisp.smr = true
+# Choose policy for Explicit Locator Path (ELP) handling
+# There are three options:
+#   default: don't add or remove locator records, return mapping as-is
+#   both: keep the ELP, but add the next hop as a standalone non-LCAF locator with a lower priority
+#   replace: remove the ELP, add the next hop as a standalone non-LCAF locator
+lisp.elpPolicy = default
 
index ed19f21dedb8d5900a0df35aa31fc20ce6b8aee4..5ab3f69bea994d60644cca47b9694205ae2ab039 100644 (file)
@@ -19,6 +19,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -26,6 +27,8 @@ import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
@@ -34,9 +37,9 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
-public class ExampleActor extends RaftActor {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
-    private final Map<String, String> state = new HashMap();
+    private final Map<String, String> state = new HashMap<>();
 
     private long persistIdentifier = 1;
     private final Optional<ActorRef> roleChangeNotifier;
@@ -118,7 +121,8 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    @Override protected void createSnapshot() {
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
         ByteString bs = null;
         try {
             bs = fromObject(state);
@@ -128,15 +132,16 @@ public class ExampleActor extends RaftActor {
         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
 
-    @Override protected void applySnapshot(byte [] snapshot) {
+    @Override
+    public void applySnapshot(byte [] snapshot) {
         state.clear();
         try {
-            state.putAll((HashMap) toObject(snapshot));
+            state.putAll((HashMap<String, String>) toObject(snapshot));
         } catch (Exception e) {
            LOG.error("Exception in applying snapshot", e);
         }
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
+            LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
         }
     }
 
@@ -192,22 +197,33 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(int maxBatchSize) {
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return this;
+    }
+
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
+    }
+
+    @Override
+    public void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(Payload data) {
+    public void applyCurrentLogRecoveryBatch() {
     }
 
     @Override
-    protected void applyCurrentLogRecoveryBatch() {
+    public void onRecoveryComplete() {
     }
 
     @Override
-    protected void onRecoveryComplete() {
+    public void applyRecoverySnapshot(byte[] snapshot) {
     }
 
     @Override
-    protected void applyRecoverySnapshot(byte[] snapshot) {
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return this;
     }
 }
index d2862c2baf219acedc921cf0710f2978f7fe85f7..2eb4189eac911d7ecb773137bfcfc652b523564b 100644 (file)
@@ -53,8 +53,8 @@ public class KeyValue extends Payload implements Serializable {
     }
 
     // override this method to return  the protobuff related extension fields and their values
-    @Override public Map<GeneratedMessage.GeneratedExtension, String> encode() {
-        Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<>();
+    @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
+        Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
         map.put(KeyValueMessages.key, getKey());
         map.put(KeyValueMessages.value, getValue());
         return map;
index 1aecc89eeafa2b746f4e751f9869ed68d1229bee..c27ff373865069df16b08eae829e1722ff74a5f8 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -19,7 +20,7 @@ import java.util.List;
 public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
 
     // We define this as ArrayList so we can use ensureCapacity.
-    protected ArrayList<ReplicatedLogEntry> journal;
+    private ArrayList<ReplicatedLogEntry> journal;
 
     private long snapshotIndex = -1;
     private long snapshotTerm = -1;
@@ -28,13 +29,17 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     private ArrayList<ReplicatedLogEntry> snapshottedJournal;
     private long previousSnapshotIndex = -1;
     private long previousSnapshotTerm = -1;
-    protected int dataSize = 0;
+    private int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
         this.snapshotIndex = snapshotIndex;
         this.snapshotTerm = snapshotTerm;
         this.journal = new ArrayList<>(unAppliedEntries);
+
+        for(ReplicatedLogEntry entry: journal) {
+            dataSize += entry.size();
+        }
     }
 
     public AbstractReplicatedLogImpl() {
@@ -90,18 +95,26 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     }
 
     @Override
-    public void removeFrom(long logEntryIndex) {
+    public long removeFrom(long logEntryIndex) {
         int adjustedIndex = adjustedIndex(logEntryIndex);
         if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
             // physical index should be less than list size and >= 0
-            return;
+            return -1;
+        }
+
+        for(int i = adjustedIndex; i < journal.size(); i++) {
+            dataSize -= journal.get(i).size();
         }
+
         journal.subList(adjustedIndex , journal.size()).clear();
+
+        return adjustedIndex;
     }
 
     @Override
     public void append(ReplicatedLogEntry replicatedLogEntry) {
         journal.add(replicatedLogEntry);
+        dataSize += replicatedLogEntry.size();
     }
 
     @Override
@@ -230,4 +243,9 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshotTerm = previousSnapshotTerm;
         previousSnapshotTerm = -1;
     }
+
+    @VisibleForTesting
+    ReplicatedLogEntry getAtPhysicalIndex(int index) {
+        return journal.get(index);
+    }
 }
index 1c30fe23175b5af62ffe808d9428c3361688a4b0..41a807aa355d394f659f488209f65e732646b120 100644 (file)
@@ -11,15 +11,10 @@ package org.opendaylight.controller.cluster.raft;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.japi.Procedure;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.Serializable;
@@ -27,6 +22,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider;
@@ -36,11 +32,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
 import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
@@ -99,8 +91,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
 
-    private static final String COMMIT_SNAPSHOT = "commit_snapshot";
-
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
     /**
@@ -117,11 +107,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     private final DelegatingPersistentDataProvider delegatingPersistenceProvider = new DelegatingPersistentDataProvider(null);
 
-    private final Procedure<Void> createSnapshotProcedure = new CreateSnapshotProcedure();
-
-    private Stopwatch recoveryTimer;
+    private RaftActorRecoverySupport raftRecovery;
 
-    private int currentRecoveryBatchCount;
+    private RaftActorSnapshotMessageSupport snapshotSupport;
 
     private final BehaviorStateHolder reusableBehaviorStateHolder = new BehaviorStateHolder();
 
@@ -140,12 +128,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setReplicatedLog(ReplicatedLogImpl.newInstance(context, delegatingPersistenceProvider, currentBehavior));
     }
 
-    private void initRecoveryTimer() {
-        if(recoveryTimer == null) {
-            recoveryTimer = Stopwatch.createStarted();
-        }
-    }
-
     @Override
     public void preStart() throws Exception {
         LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
@@ -156,7 +138,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void postStop() {
-        if(currentBehavior != null) {
+        if(currentBehavior.getDelegate() != null) {
             try {
                 currentBehavior.close();
             } catch (Exception e) {
@@ -169,134 +151,28 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
     @Override
     public void handleRecover(Object message) {
-        if(persistence().isRecoveryApplicable()) {
-            if (message instanceof SnapshotOffer) {
-                onRecoveredSnapshot((SnapshotOffer) message);
-            } else if (message instanceof ReplicatedLogEntry) {
-                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
-            } else if (message instanceof ApplyLogEntries) {
-                // Handle this message for backwards compatibility with pre-Lithium versions.
-                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
-            } else if (message instanceof ApplyJournalEntries) {
-                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
-            } else if (message instanceof DeleteEntries) {
-                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
-            } else if (message instanceof UpdateElectionTerm) {
-                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
-                        ((UpdateElectionTerm) message).getVotedFor());
-            } else if (message instanceof RecoveryCompleted) {
-                onRecoveryCompletedMessage();
-            }
-        } else {
-            if (message instanceof RecoveryCompleted) {
+        if(raftRecovery == null) {
+            raftRecovery = new RaftActorRecoverySupport(delegatingPersistenceProvider, context, currentBehavior,
+                    getRaftActorRecoveryCohort());
+        }
+
+        boolean recoveryComplete = raftRecovery.handleRecoveryMessage(message);
+        if(recoveryComplete) {
+            if(!persistence().isRecoveryApplicable()) {
                 // Delete all the messages from the akka journal so that we do not end up with consistency issues
                 // Note I am not using the dataPersistenceProvider and directly using the akka api here
                 deleteMessages(lastSequenceNr());
 
                 // Delete all the akka snapshots as they will not be needed
                 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
-
-                onRecoveryComplete();
-
-                initializeBehavior();
             }
-        }
-    }
-
-    private void onRecoveredSnapshot(SnapshotOffer offer) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: SnapshotOffer called..", persistenceId());
-        }
-
-        initRecoveryTimer();
-
-        Snapshot snapshot = (Snapshot) offer.snapshot();
 
-        // Create a replicated log with the snapshot information
-        // The replicated log can be used later on to retrieve this snapshot
-        // when we need to install it on a peer
+            onRecoveryComplete();
 
-        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                currentBehavior));
-        context.setLastApplied(snapshot.getLastAppliedIndex());
-        context.setCommitIndex(snapshot.getLastAppliedIndex());
+            initializeBehavior();
 
-        Stopwatch timer = Stopwatch.createStarted();
-
-        // Apply the snapshot to the actors state
-        applyRecoverySnapshot(snapshot.getState());
-
-        timer.stop();
-        LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
-                replicatedLog().size(), persistenceId(), timer.toString(),
-                replicatedLog().getSnapshotIndex(), replicatedLog().getSnapshotTerm());
-    }
-
-    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ReplicatedLogEntry for recovery: {}", persistenceId(), logEntry.getIndex());
+            raftRecovery = null;
         }
-
-        replicatedLog().append(logEntry);
-    }
-
-    private void onRecoveredApplyLogEntries(long toIndex) {
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
-                    persistenceId(), context.getLastApplied() + 1, toIndex);
-        }
-
-        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
-            batchRecoveredLogEntry(replicatedLog().get(i));
-        }
-
-        context.setLastApplied(toIndex);
-        context.setCommitIndex(toIndex);
-    }
-
-    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
-        initRecoveryTimer();
-
-        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
-        if(currentRecoveryBatchCount == 0) {
-            startLogRecoveryBatch(batchSize);
-        }
-
-        appendRecoveredLogEntry(logEntry.getData());
-
-        if(++currentRecoveryBatchCount >= batchSize) {
-            endCurrentLogRecoveryBatch();
-        }
-    }
-
-    private void endCurrentLogRecoveryBatch() {
-        applyCurrentLogRecoveryBatch();
-        currentRecoveryBatchCount = 0;
-    }
-
-    private void onRecoveryCompletedMessage() {
-        if(currentRecoveryBatchCount > 0) {
-            endCurrentLogRecoveryBatch();
-        }
-
-        onRecoveryComplete();
-
-        String recoveryTime = "";
-        if(recoveryTimer != null) {
-            recoveryTimer.stop();
-            recoveryTime = " in " + recoveryTimer.toString();
-            recoveryTimer = null;
-        }
-
-        LOG.info(
-            "Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
-                "Persistence Id =  " + persistenceId() +
-                " Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
-                "journal-size={}",
-            replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
-            replicatedLog().getSnapshotTerm(), replicatedLog().size());
-
-        initializeBehavior();
     }
 
     protected void initializeBehavior(){
@@ -309,7 +185,18 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
     }
 
-    @Override public void handleCommand(Object message) {
+    @Override
+    public void handleCommand(Object message) {
+        if(snapshotSupport == null) {
+            snapshotSupport = new RaftActorSnapshotMessageSupport(delegatingPersistenceProvider, context,
+                    currentBehavior, getRaftActorSnapshotCohort(), self());
+        }
+
+        boolean handled = snapshotSupport.handleSnapshotMessage(message);
+        if(handled) {
+            return;
+        }
+
         if (message instanceof ApplyState){
             ApplyState applyState = (ApplyState) message;
 
@@ -336,56 +223,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             persistence().persist(applyEntries, NoopProcedure.instance());
 
-        } else if(message instanceof ApplySnapshot ) {
-            Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
-
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("{}: ApplySnapshot called on Follower Actor " +
-                        "snapshotIndex:{}, snapshotTerm:{}", persistenceId(), snapshot.getLastAppliedIndex(),
-                    snapshot.getLastAppliedTerm()
-                );
-            }
-
-            applySnapshot(snapshot.getState());
-
-            //clears the followers log, sets the snapshot index to ensure adjusted-index works
-            context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, delegatingPersistenceProvider,
-                    currentBehavior));
-            context.setLastApplied(snapshot.getLastAppliedIndex());
-
         } else if (message instanceof FindLeader) {
             getSender().tell(
                 new FindLeaderReply(getLeaderAddress()),
                 getSelf()
             );
-
-        } else if (message instanceof SaveSnapshotSuccess) {
-            SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
-            LOG.info("{}: SaveSnapshotSuccess received for snapshot", persistenceId());
-
-            long sequenceNumber = success.metadata().sequenceNr();
-
-            commitSnapshot(sequenceNumber);
-
-        } else if (message instanceof SaveSnapshotFailure) {
-            SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
-
-            LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
-                    persistenceId(), saveSnapshotFailure.cause());
-
-            context.getSnapshotManager().rollback();
-
-        } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("{}: CaptureSnapshot received by actor: {}", persistenceId(), message);
-
-            context.getSnapshotManager().create(createSnapshotProcedure);
-
-        } else if (message instanceof CaptureSnapshotReply) {
-            handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
         } else if(message instanceof GetOnDemandRaftState) {
             onGetOnDemandRaftStats();
-        } else if (message.equals(COMMIT_SNAPSHOT)) {
-            commitSnapshot(-1);
         } else {
             reusableBehaviorStateHolder.init(getCurrentBehavior());
 
@@ -621,7 +465,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     // Make saving Snapshot successful
                     // Committing the snapshot here would end up calling commit in the creating state which would
                     // be a state violation. That's why now we send a message to commit the snapshot.
-                    self().tell(COMMIT_SNAPSHOT, self());
+                    self().tell(RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT, self());
                 }
             });
         }
@@ -645,10 +489,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         context.setPeerAddress(peerId, peerAddress);
     }
 
-    protected void commitSnapshot(long sequenceNumber) {
-        context.getSnapshotManager().commit(persistence(), sequenceNumber);
-    }
-
     /**
      * The applyState method will be called by the RaftActor when some data
      * needs to be applied to the actor's state
@@ -670,31 +510,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         Object data);
 
     /**
-     * This method is called during recovery at the start of a batch of state entries. Derived
-     * classes should perform any initialization needed to start a batch.
+     * Returns the RaftActorRecoveryCohort to participate in persistence recovery.
      */
-    protected abstract void startLogRecoveryBatch(int maxBatchSize);
-
-    /**
-     * This method is called during recovery to append state data to the current batch. This method
-     * is called 1 or more times after {@link #startLogRecoveryBatch}.
-     *
-     * @param data the state data
-     */
-    protected abstract void appendRecoveredLogEntry(Payload data);
-
-    /**
-     * This method is called during recovery to reconstruct the state of the actor.
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
-
-    /**
-     * This method is called during recovery at the end of a batch to apply the current batched
-     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
-     */
-    protected abstract void applyCurrentLogRecoveryBatch();
+    @Nonnull
+    protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort();
 
     /**
      * This method is called when recovery is complete.
@@ -702,24 +521,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
     protected abstract void onRecoveryComplete();
 
     /**
-     * This method will be called by the RaftActor when a snapshot needs to be
-     * created. The derived actor should respond with its current state.
-     * <p/>
-     * During recovery the state that is returned by the derived actor will
-     * be passed back to it by calling the applySnapshot  method
-     *
-     * @return The current state of the actor
+     * Returns the RaftActorSnapshotCohort to participate in persistence recovery.
      */
-    protected abstract void createSnapshot();
-
-    /**
-     * This method can be called at any other point during normal
-     * operations when the derived actor is out of sync with it's peers
-     * and the only way to bring it in sync is by applying a snapshot
-     *
-     * @param snapshotBytes A snapshot of the state of the actor
-     */
-    protected abstract void applySnapshot(byte[] snapshotBytes);
+    @Nonnull
+    protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();
 
     /**
      * This method will be called by the RaftActor when the state of the
@@ -753,12 +558,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         return peerAddress;
     }
 
-    private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
-        LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
-        context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, context.getTotalMemory());
-    }
-
     protected boolean hasFollowers(){
         return getRaftActorContext().hasFollowers();
     }
@@ -795,14 +594,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         }
     }
 
-    private class CreateSnapshotProcedure implements Procedure<Void> {
-
-        @Override
-        public void apply(Void aVoid) throws Exception {
-            createSnapshot();
-        }
-    }
-
     private static class BehaviorStateHolder {
         private RaftActorBehavior behavior;
         private String leaderId;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoveryCohort.java
new file mode 100644 (file)
index 0000000..a9f00aa
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+/**
+ * Interface for a class that participates in raft actor persistence recovery.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorRecoveryCohort {
+
+    /**
+     * This method is called during recovery at the start of a batch of state entries. Derived
+     * classes should perform any initialization needed to start a batch.
+     */
+    void startLogRecoveryBatch(int maxBatchSize);
+
+    /**
+     * This method is called during recovery to append state data to the current batch. This method
+     * is called 1 or more times after {@link #startLogRecoveryBatch}.
+     *
+     * @param data the state data
+     */
+    void appendRecoveredLogEntry(Payload data);
+
+    /**
+     * This method is called during recovery to reconstruct the state of the actor.
+     *
+     * @param snapshotBytes A snapshot of the state of the actor
+     */
+    void applyRecoverySnapshot(byte[] snapshotBytes);
+
+    /**
+     * This method is called during recovery at the end of a batch to apply the current batched
+     * log entries. This method is called after {@link #appendRecoveredLogEntry}.
+     */
+    void applyCurrentLogRecoveryBatch();
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
new file mode 100644 (file)
index 0000000..d2c14de
--- /dev/null
@@ -0,0 +1,173 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.SnapshotOffer;
+import com.google.common.base.Stopwatch;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Support class that handles persistence recovery for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorRecoverySupport {
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorRecoveryCohort cohort;
+
+    private int currentRecoveryBatchCount;
+
+    private Stopwatch recoveryTimer;
+    private final Logger log;
+
+    RaftActorRecoverySupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorRecoveryCohort cohort) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.log = context.getLogger();
+    }
+
+    boolean handleRecoveryMessage(Object message) {
+        boolean recoveryComplete = false;
+        if(persistence.isRecoveryApplicable()) {
+            if (message instanceof SnapshotOffer) {
+                onRecoveredSnapshot((SnapshotOffer) message);
+            } else if (message instanceof ReplicatedLogEntry) {
+                onRecoveredJournalLogEntry((ReplicatedLogEntry) message);
+            } else if (message instanceof ApplyLogEntries) {
+                // Handle this message for backwards compatibility with pre-Lithium versions.
+                onRecoveredApplyLogEntries(((ApplyLogEntries) message).getToIndex());
+            } else if (message instanceof ApplyJournalEntries) {
+                onRecoveredApplyLogEntries(((ApplyJournalEntries) message).getToIndex());
+            } else if (message instanceof DeleteEntries) {
+                replicatedLog().removeFrom(((DeleteEntries) message).getFromIndex());
+            } else if (message instanceof UpdateElectionTerm) {
+                context.getTermInformation().update(((UpdateElectionTerm) message).getCurrentTerm(),
+                        ((UpdateElectionTerm) message).getVotedFor());
+            } else if (message instanceof RecoveryCompleted) {
+                onRecoveryCompletedMessage();
+                recoveryComplete = true;
+            }
+        } else if (message instanceof RecoveryCompleted) {
+            recoveryComplete = true;
+        }
+
+        return recoveryComplete;
+    }
+
+    private ReplicatedLog replicatedLog() {
+        return context.getReplicatedLog();
+    }
+
+    private void initRecoveryTimer() {
+        if(recoveryTimer == null) {
+            recoveryTimer = Stopwatch.createStarted();
+        }
+    }
+
+    private void onRecoveredSnapshot(SnapshotOffer offer) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: SnapshotOffer called..", context.getId());
+        }
+
+        initRecoveryTimer();
+
+        Snapshot snapshot = (Snapshot) offer.snapshot();
+
+        // Create a replicated log with the snapshot information
+        // The replicated log can be used later on to retrieve this snapshot
+        // when we need to install it on a peer
+
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence, currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+        context.setCommitIndex(snapshot.getLastAppliedIndex());
+
+        Stopwatch timer = Stopwatch.createStarted();
+
+        // Apply the snapshot to the actors state
+        cohort.applyRecoverySnapshot(snapshot.getState());
+
+        timer.stop();
+        log.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size={}",
+                context.getId(), timer.toString(), replicatedLog().getSnapshotIndex(),
+                replicatedLog().getSnapshotTerm(), replicatedLog().size());
+    }
+
+    private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Received ReplicatedLogEntry for recovery: index: {}, size: {}", context.getId(),
+                    logEntry.getIndex(), logEntry.size());
+        }
+
+        replicatedLog().append(logEntry);
+    }
+
+    private void onRecoveredApplyLogEntries(long toIndex) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: Received ApplyLogEntries for recovery, applying to state: {} to {}",
+                    context.getId(), context.getLastApplied() + 1, toIndex);
+        }
+
+        for (long i = context.getLastApplied() + 1; i <= toIndex; i++) {
+            batchRecoveredLogEntry(replicatedLog().get(i));
+        }
+
+        context.setLastApplied(toIndex);
+        context.setCommitIndex(toIndex);
+    }
+
+    private void batchRecoveredLogEntry(ReplicatedLogEntry logEntry) {
+        initRecoveryTimer();
+
+        int batchSize = context.getConfigParams().getJournalRecoveryLogBatchSize();
+        if(currentRecoveryBatchCount == 0) {
+            cohort.startLogRecoveryBatch(batchSize);
+        }
+
+        cohort.appendRecoveredLogEntry(logEntry.getData());
+
+        if(++currentRecoveryBatchCount >= batchSize) {
+            endCurrentLogRecoveryBatch();
+        }
+    }
+
+    private void endCurrentLogRecoveryBatch() {
+        cohort.applyCurrentLogRecoveryBatch();
+        currentRecoveryBatchCount = 0;
+    }
+
+    private void onRecoveryCompletedMessage() {
+        if(currentRecoveryBatchCount > 0) {
+            endCurrentLogRecoveryBatch();
+        }
+
+        String recoveryTime = "";
+        if(recoveryTimer != null) {
+            recoveryTimer.stop();
+            recoveryTime = " in " + recoveryTimer.toString();
+            recoveryTimer = null;
+        }
+
+        log.info("Recovery completed" + recoveryTime + " - Switching actor to Follower - " +
+                 "Persistence Id =  " + context.getId() +
+                 " Last index in log = {}, snapshotIndex = {}, snapshotTerm = {}, " +
+                 "journal-size = {}", replicatedLog().lastIndex(), replicatedLog().getSnapshotIndex(),
+                 replicatedLog().getSnapshotTerm(), replicatedLog().size());
+    }
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotCohort.java
new file mode 100644 (file)
index 0000000..ad68726
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+
+/**
+ * Interface for a class that participates in raft actor snapshotting.
+ *
+ * @author Thomas Pantelis
+ */
+public interface RaftActorSnapshotCohort {
+
+    /**
+     * This method is called by the RaftActor when a snapshot needs to be
+     * created. The implementation should send a CaptureSnapshotReply to the given actor.
+     *
+     * @param actorRef the actor to which to respond
+     */
+    void createSnapshot(ActorRef actorRef);
+
+    /**
+     * This method is called to apply a snapshot installed by the leader.
+     *
+     * @param snapshotBytes a snapshot of the state of the actor
+     */
+    void applySnapshot(byte[] snapshotBytes);
+}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorSnapshotMessageSupport.java
new file mode 100644 (file)
index 0000000..21c8ffa
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import akka.actor.ActorRef;
+import akka.japi.Procedure;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.slf4j.Logger;
+
+/**
+ * Handles snapshot related messages for a RaftActor.
+ *
+ * @author Thomas Pantelis
+ */
+class RaftActorSnapshotMessageSupport {
+    static final String COMMIT_SNAPSHOT = "commit_snapshot";
+
+    private final DataPersistenceProvider persistence;
+    private final RaftActorContext context;
+    private final RaftActorBehavior currentBehavior;
+    private final RaftActorSnapshotCohort cohort;
+    private final ActorRef raftActorRef;
+    private final Logger log;
+
+    private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
+        @Override
+        public void apply(Void notUsed) throws Exception {
+            cohort.createSnapshot(raftActorRef);
+        }
+    };
+
+    RaftActorSnapshotMessageSupport(DataPersistenceProvider persistence, RaftActorContext context,
+            RaftActorBehavior currentBehavior, RaftActorSnapshotCohort cohort, ActorRef raftActorRef) {
+        this.persistence = persistence;
+        this.context = context;
+        this.currentBehavior = currentBehavior;
+        this.cohort = cohort;
+        this.raftActorRef = raftActorRef;
+        this.log = context.getLogger();
+    }
+
+    boolean handleSnapshotMessage(Object message) {
+        if(message instanceof ApplySnapshot ) {
+            onApplySnapshot(((ApplySnapshot) message).getSnapshot());
+            return true;
+        } else if (message instanceof SaveSnapshotSuccess) {
+            onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+            return true;
+        } else if (message instanceof SaveSnapshotFailure) {
+            onSaveSnapshotFailure((SaveSnapshotFailure) message);
+            return true;
+        } else if (message instanceof CaptureSnapshot) {
+            onCaptureSnapshot(message);
+            return true;
+        } else if (message instanceof CaptureSnapshotReply) {
+            onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
+            return true;
+        } else if (message.equals(COMMIT_SNAPSHOT)) {
+            context.getSnapshotManager().commit(persistence, -1);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void onCaptureSnapshotReply(byte[] snapshotBytes) {
+        log.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", context.getId(), snapshotBytes.length);
+
+        context.getSnapshotManager().persist(persistence, snapshotBytes, currentBehavior, context.getTotalMemory());
+    }
+
+    private void onCaptureSnapshot(Object message) {
+        log.debug("{}: CaptureSnapshot received by actor: {}", context.getId(), message);
+
+        context.getSnapshotManager().create(createSnapshotProcedure);
+    }
+
+    private void onSaveSnapshotFailure(SaveSnapshotFailure saveSnapshotFailure) {
+        log.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+                context.getId(), saveSnapshotFailure.cause());
+
+        context.getSnapshotManager().rollback();
+    }
+
+    private void onSaveSnapshotSuccess(SaveSnapshotSuccess success) {
+        log.info("{}: SaveSnapshotSuccess received for snapshot", context.getId());
+
+        long sequenceNumber = success.metadata().sequenceNr();
+
+        context.getSnapshotManager().commit(persistence, sequenceNumber);
+    }
+
+    private void onApplySnapshot(Snapshot snapshot) {
+        if(log.isDebugEnabled()) {
+            log.debug("{}: ApplySnapshot called on Follower Actor " +
+                    "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
+                snapshot.getLastAppliedTerm());
+        }
+
+        cohort.applySnapshot(snapshot.getState());
+
+        //clears the followers log, sets the snapshot index to ensure adjusted-index works
+        context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, persistence,
+                currentBehavior));
+        context.setLastApplied(snapshot.getLastAppliedIndex());
+    }
+}
index 3e4d727c7162a45fc32c942561d2599b61a45075..8388eaf7436f251c63aa277024bf886f803153cb 100644 (file)
@@ -51,8 +51,9 @@ public interface ReplicatedLog {
      * information
      *
      * @param index the index of the log entry
+     * @return the adjusted index of the first log entry removed or -1 if log entry not found.
      */
-    void removeFrom(long index);
+    long removeFrom(long index);
 
 
     /**
index fdb630538130aa2ac1bfcdac43b6f5ad3808a1ba..5a77b9aea3ac4b008af17e760ac5a0f53349220e 100644 (file)
@@ -28,10 +28,6 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     private final Procedure<DeleteEntries> deleteProcedure = new Procedure<DeleteEntries>() {
         @Override
         public void apply(DeleteEntries param) {
-            dataSize = 0;
-            for (ReplicatedLogEntry entry : journal) {
-                dataSize += entry.size();
-            }
         }
     };
 
@@ -57,16 +53,11 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     @Override
     public void removeFromAndPersist(long logEntryIndex) {
-        int adjustedIndex = adjustedIndex(logEntryIndex);
-
-        if (adjustedIndex < 0) {
-            return;
-        }
-
         // FIXME: Maybe this should be done after the command is saved
-        journal.subList(adjustedIndex , journal.size()).clear();
-
-        persistence.persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+        long adjustedIndex = removeFrom(logEntryIndex);
+        if(adjustedIndex >= 0) {
+            persistence.persist(new DeleteEntries((int)adjustedIndex), deleteProcedure);
+        }
     }
 
     @Override
@@ -83,7 +74,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
         }
 
         // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
-        journal.add(replicatedLogEntry);
+        append(replicatedLogEntry);
 
         // When persisting events with persist it is guaranteed that the
         // persistent actor will not receive further commands between the
@@ -96,8 +87,7 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
                 public void apply(ReplicatedLogEntry evt) throws Exception {
                     int logEntrySize = replicatedLogEntry.size();
 
-                    dataSize += logEntrySize;
-                    long dataSizeForCheck = dataSize;
+                    long dataSizeForCheck = dataSize();
 
                     dataSizeSinceLastSnapshot += logEntrySize;
 
index b910313b096015ad166c8be1178ac8f72ad12167..3c6c8281fb734b0d378963c831c2f462fbcc3182 100644 (file)
@@ -112,14 +112,14 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         @Override
-        protected void createSnapshot() {
+        public void createSnapshot(ActorRef actorRef) {
             if(snapshot != null) {
                 getSelf().tell(new CaptureSnapshotReply(snapshot), ActorRef.noSender());
             }
         }
 
         @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
+        public void applyRecoverySnapshot(byte[] bytes) {
         }
 
         void setSnapshot(byte[] snapshot) {
index 8fdb7ea226e835186df84d7b71cb4125f2b3a759..c99f253657734cee8112f42e85b93e7feeebf215 100644 (file)
@@ -15,7 +15,6 @@ import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,14 +39,6 @@ public class AbstractReplicatedLogImplTest {
 
     }
 
-    @After
-    public void tearDown() {
-        replicatedLogImpl.journal.clear();
-        replicatedLogImpl.setSnapshotIndex(-1);
-        replicatedLogImpl.setSnapshotTerm(-1);
-        replicatedLogImpl = null;
-    }
-
     @Test
     public void testIndexOperations() {
 
@@ -65,7 +56,7 @@ public class AbstractReplicatedLogImplTest {
         // now create a snapshot of 3 entries, with 1 unapplied entry left in the log
         // It removes the entries which have made it to snapshot
         // and updates the snapshot index and term
-        Map<Long, String> state = takeSnapshot(3);
+        takeSnapshot(3);
 
         // check the values after the snapshot.
         // each index value passed in the test is the logical index (log entry index)
@@ -101,7 +92,7 @@ public class AbstractReplicatedLogImplTest {
         assertEquals(2, replicatedLogImpl.getFrom(6).size());
 
         // take a second snapshot with 5 entries with 0 unapplied entries left in the log
-        state = takeSnapshot(5);
+        takeSnapshot(5);
 
         assertEquals(0, replicatedLogImpl.size());
         assertNull(replicatedLogImpl.last());
@@ -187,19 +178,45 @@ public class AbstractReplicatedLogImplTest {
         assertTrue(replicatedLogImpl.isPresent(5));
     }
 
+    @Test
+    public void testRemoveFrom() {
+
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 4, new MockPayload("E", 2)));
+        replicatedLogImpl.append(new MockReplicatedLogEntry(2, 5, new MockPayload("F", 3)));
+
+        assertEquals("dataSize", 9, replicatedLogImpl.dataSize());
+
+        long adjusted = replicatedLogImpl.removeFrom(4);
+        assertEquals("removeFrom - adjusted", 4, adjusted);
+        assertEquals("size", 4, replicatedLogImpl.size());
+        assertEquals("dataSize", 4, replicatedLogImpl.dataSize());
+
+        takeSnapshot(1);
+
+        adjusted = replicatedLogImpl.removeFrom(2);
+        assertEquals("removeFrom - adjusted", 1, adjusted);
+        assertEquals("size", 1, replicatedLogImpl.size());
+        assertEquals("dataSize", 1, replicatedLogImpl.dataSize());
+
+        assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(0));
+        assertEquals("removeFrom - adjusted", -1, replicatedLogImpl.removeFrom(100));
+    }
+
     // create a snapshot for test
     public Map<Long, String> takeSnapshot(final int numEntries) {
         Map<Long, String> map = new HashMap<>(numEntries);
-        List<ReplicatedLogEntry> entries = replicatedLogImpl.getEntriesTill(numEntries);
-        for (ReplicatedLogEntry entry : entries) {
+
+        long lastIndex = 0;
+        long lastTerm = 0;
+        for(int i = 0; i < numEntries; i++) {
+            ReplicatedLogEntry entry = replicatedLogImpl.getAtPhysicalIndex(i);
             map.put(entry.getIndex(), entry.getData().toString());
+            lastIndex = entry.getIndex();
+            lastTerm = entry.getTerm();
         }
 
-        int term = (int) replicatedLogImpl.lastTerm();
-        int lastIndex = (int) entries.get(entries.size() - 1).getIndex();
-        entries.clear();
-        replicatedLogImpl.setSnapshotTerm(term);
-        replicatedLogImpl.setSnapshotIndex(lastIndex);
+        replicatedLogImpl.snapshotPreCommit(lastIndex, lastTerm);
+        replicatedLogImpl.snapshotCommit();
 
         return map;
 
@@ -213,15 +230,6 @@ public class AbstractReplicatedLogImplTest {
         public void removeFromAndPersist(final long index) {
         }
 
-        @Override
-        public int dataSize() {
-            return -1;
-        }
-
-        public List<ReplicatedLogEntry> getEntriesTill(final int index) {
-            return journal.subList(0, index);
-        }
-
         @Override
         public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
         }
index 63f0df2f8c74c42e6a4d45bcf95a0f4aa707a48a..bab4a336002e4526136b97399d5828eed0bacdb3 100644 (file)
@@ -266,8 +266,8 @@ public class MockRaftActorContext implements RaftActorContext {
             this.size = size;
         }
 
-        @Override public  Map<GeneratedMessage.GeneratedExtension, String> encode() {
-            Map<GeneratedMessage.GeneratedExtension, String> map = new HashMap<GeneratedMessage.GeneratedExtension, String>();
+        @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
+            Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
             map.put(MockPayloadMessages.value, value);
             return map;
         }
index 17a81ac3c39aa9c0a27743e1739892629a157ba8..14bfd1d348b69dc76332fc35ec8f8f94dd80e8db 100644 (file)
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -97,9 +98,11 @@ public class RaftActorTest extends AbstractActorTest {
         InMemorySnapshotStore.clear();
     }
 
-    public static class MockRaftActor extends RaftActor {
+    public static class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
 
-        private final RaftActor delegate;
+        private final RaftActor actorDelegate;
+        private final RaftActorRecoveryCohort recoveryCohortDelegate;
+        private final RaftActorSnapshotCohort snapshotCohortDelegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
         private ActorRef roleChangeNotifier;
@@ -136,7 +139,9 @@ public class RaftActorTest extends AbstractActorTest {
                              DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
-            this.delegate = mock(RaftActor.class);
+            this.actorDelegate = mock(RaftActor.class);
+            this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
+            this.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
             if(dataPersistenceProvider == null){
                 setPersistence(true);
             } else {
@@ -197,26 +202,37 @@ public class RaftActorTest extends AbstractActorTest {
 
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            delegate.applyState(clientActor, identifier, data);
+            actorDelegate.applyState(clientActor, identifier, data);
             LOG.info("{}: applyState called", persistenceId());
         }
 
         @Override
-        protected void startLogRecoveryBatch(int maxBatchSize) {
+        @Nonnull
+        protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+            return this;
         }
 
         @Override
-        protected void appendRecoveredLogEntry(Payload data) {
+        protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+            return this;
+        }
+
+        @Override
+        public void startLogRecoveryBatch(int maxBatchSize) {
+        }
+
+        @Override
+        public void appendRecoveredLogEntry(Payload data) {
             state.add(data);
         }
 
         @Override
-        protected void applyCurrentLogRecoveryBatch() {
+        public void applyCurrentLogRecoveryBatch() {
         }
 
         @Override
         protected void onRecoveryComplete() {
-            delegate.onRecoveryComplete();
+            actorDelegate.onRecoveryComplete();
             recoveryComplete.countDown();
         }
 
@@ -227,8 +243,8 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override
-        protected void applyRecoverySnapshot(byte[] bytes) {
-            delegate.applyRecoverySnapshot(bytes);
+        public void applyRecoverySnapshot(byte[] bytes) {
+            recoveryCohortDelegate.applyRecoverySnapshot(bytes);
             try {
                 Object data = toObject(bytes);
                 if (data instanceof List) {
@@ -239,18 +255,21 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        @Override protected void createSnapshot() {
+        @Override
+        public void createSnapshot(ActorRef actorRef) {
             LOG.info("{}: createSnapshot called", persistenceId());
-            delegate.createSnapshot();
+            snapshotCohortDelegate.createSnapshot(actorRef);
         }
 
-        @Override protected void applySnapshot(byte [] snapshot) {
+        @Override
+        public void applySnapshot(byte [] snapshot) {
             LOG.info("{}: applySnapshot called", persistenceId());
-            delegate.applySnapshot(snapshot);
+            snapshotCohortDelegate.applySnapshot(snapshot);
         }
 
-        @Override protected void onStateChanged() {
-            delegate.onStateChanged();
+        @Override
+        protected void onStateChanged() {
+            actorDelegate.onStateChanged();
         }
 
         @Override
@@ -284,7 +303,6 @@ public class RaftActorTest extends AbstractActorTest {
         public ReplicatedLog getReplicatedLog(){
             return this.getRaftActorContext().getReplicatedLog();
         }
-
     }
 
 
@@ -399,11 +417,11 @@ public class RaftActorTest extends AbstractActorTest {
             // add more entries after snapshot is taken
             List<ReplicatedLogEntry> entries = new ArrayList<>();
             ReplicatedLogEntry entry2 = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
-                    new MockRaftActorContext.MockPayload("F"));
+                    new MockRaftActorContext.MockPayload("F", 2));
             ReplicatedLogEntry entry3 = new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
-                    new MockRaftActorContext.MockPayload("G"));
+                    new MockRaftActorContext.MockPayload("G", 3));
             ReplicatedLogEntry entry4 = new MockRaftActorContext.MockReplicatedLogEntry(1, 7,
-                    new MockRaftActorContext.MockPayload("H"));
+                    new MockRaftActorContext.MockPayload("H", 4));
             entries.add(entry2);
             entries.add(entry3);
             entries.add(entry4);
@@ -433,6 +451,7 @@ public class RaftActorTest extends AbstractActorTest {
             RaftActorContext context = ref.underlyingActor().getRaftActorContext();
             assertEquals("Journal log size", snapshotUnappliedEntries.size() + entries.size(),
                     context.getReplicatedLog().size());
+            assertEquals("Journal data size", 10, context.getReplicatedLog().dataSize());
             assertEquals("Last index", lastIndex, context.getReplicatedLog().lastIndex());
             assertEquals("Last applied", lastAppliedToState, context.getLastApplied());
             assertEquals("Commit index", lastAppliedToState, context.getCommitIndex());
@@ -516,7 +535,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
+                verify(mockRaftActor.recoveryCohortDelegate).applyRecoverySnapshot(eq(snapshotBytes.toByteArray()));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -583,7 +602,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(byte[].class));
+                verify(mockRaftActor.recoveryCohortDelegate, times(0)).applyRecoverySnapshot(any(byte[].class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -810,7 +829,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.getRaftActorContext().getSnapshotManager().capture(lastEntry, replicatedToAllIndex);
 
-                verify(mockRaftActor.delegate).createSnapshot();
+                verify(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes.toByteArray()));
 
@@ -860,7 +879,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
 
-                verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+                verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
 
             }
         };
@@ -907,7 +926,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
 
-                verify(mockRaftActor.delegate).applySnapshot(eq(snapshot.getState()));
+                verify(mockRaftActor.snapshotCohortDelegate).applySnapshot(eq(snapshot.getState()));
 
                 assertTrue("The replicatedLog should have changed",
                         oldReplicatedLog != mockRaftActor.getReplicatedLog());
@@ -1131,7 +1150,7 @@ public class RaftActorTest extends AbstractActorTest {
                         .capture(new MockRaftActorContext.MockReplicatedLogEntry(1, 6,
                                 new MockRaftActorContext.MockPayload("x")), 4);
 
-                verify(leaderActor.delegate).createSnapshot();
+                verify(leaderActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(8, leaderActor.getReplicatedLog().size());
 
@@ -1230,7 +1249,7 @@ public class RaftActorTest extends AbstractActorTest {
                         new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
                                 new MockRaftActorContext.MockPayload("D")), 4);
 
-                verify(followerActor.delegate).createSnapshot();
+                verify(followerActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
 
                 assertEquals(6, followerActor.getReplicatedLog().size());
 
index 6de2666de1a32af442a0367a011c5d8f758288d5..7acdafef245f85871e251ad073f0c281bddafbef 100644 (file)
@@ -14,7 +14,6 @@ import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.controller.sal.binding.test.util.BindingBrokerTestFactory;
 import org.opendaylight.controller.sal.binding.test.util.BindingTestContext;
 
-@SuppressWarnings("deprecation")
 public abstract class AbstractDataServiceTest {
 
     protected DataProviderService baDataService;
index 3d25018e247daf848f517746f73ff6e79bf9b89d..cdf7fbc7f996498f62d736e5f9b0667b67ca29d7 100644 (file)
@@ -9,10 +9,8 @@ package org.opendaylight.controller.md.sal.binding.data;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
@@ -28,14 +26,13 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
  * FIXME: THis test should be moved to sal-binding-broker and rewriten
  * to use new DataBroker API
  */
-@SuppressWarnings("deprecation")
 public class ConcurrentImplicitCreateTest extends AbstractDataServiceTest {
 
     private static final TopLevelListKey FOO_KEY = new TopLevelListKey("foo");
     private static final TopLevelListKey BAR_KEY = new TopLevelListKey("bar");
-    private static InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier.builder(Top.class).build();
-    private static InstanceIdentifier<TopLevelList> FOO_PATH = TOP_PATH.child(TopLevelList.class, FOO_KEY);
-    private static InstanceIdentifier<TopLevelList> BAR_PATH = TOP_PATH.child(TopLevelList.class, BAR_KEY);
+    private static final InstanceIdentifier<Top> TOP_PATH = InstanceIdentifier.builder(Top.class).build();
+    private static final InstanceIdentifier<TopLevelList> FOO_PATH = TOP_PATH.child(TopLevelList.class, FOO_KEY);
+    private static final InstanceIdentifier<TopLevelList> BAR_PATH = TOP_PATH.child(TopLevelList.class, BAR_KEY);
 
     @Test
     public void testConcurrentCreate() throws InterruptedException, ExecutionException {
index 0a611a75d0ed2bd57fe27a6fd9117068ca1cf230..e0a151adc0d729d4ab65f22aa6072d279a1cd9ac 100644 (file)
@@ -10,12 +10,11 @@ package org.opendaylight.controller.md.sal.binding.data;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
@@ -35,12 +34,9 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
-import com.google.common.util.concurrent.SettableFuture;
-
 /**
  * FIXME: THis test should be moved to compat test-suite
  */
-@SuppressWarnings("deprecation")
 public class WildcardedDataChangeListenerTest extends AbstractDataServiceTest {
 
     private static final TopLevelListKey TOP_LEVEL_LIST_0_KEY = new TopLevelListKey("test:0");
index 40d4591001212c05e716f9fd6b2898d37cc47267..73712813d404b310afd6e8049e547b8d45e0d525 100644 (file)
@@ -1,9 +1,8 @@
 package org.opendaylight.controller.sal.binding.test.bugfix;
 
 import static org.junit.Assert.assertFalse;
-
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.ExecutionException;
-
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
@@ -23,9 +22,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
-import com.google.common.util.concurrent.SettableFuture;
-
-@SuppressWarnings("deprecation")
 public class DeleteNestedAugmentationListenParentTest extends AbstractDataServiceTest {
 
     private static final TopLevelListKey FOO_KEY = new TopLevelListKey("foo");
index 0f9051d41c650ebddd0d2b02b71d39f3ae5e5b8c..591effbe03cdca28175938fd3611dbdf5e82f740 100644 (file)
@@ -10,9 +10,8 @@ package org.opendaylight.controller.sal.binding.test.bugfix;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
@@ -29,9 +28,6 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
-import com.google.common.util.concurrent.SettableFuture;
-
-@SuppressWarnings("deprecation")
 public class WriteParentListenAugmentTest extends AbstractDataServiceTest {
 
     private static final String TLL_NAME = "foo";
index 7941f4d4aee1884e462fbc7e690ce66b4752e419..de7445ee70299b8da7d6dc40d3b7a8bacde2f014 100644 (file)
@@ -11,7 +11,7 @@ package org.opendaylight.controller.sal.binding.test.bugfix;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-
+import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
@@ -30,9 +30,6 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-import com.google.common.collect.ImmutableList;
-
-@SuppressWarnings("deprecation")
 public class WriteParentReadChildTest extends AbstractDataServiceTest {
 
     private static final int LIST11_ID = 1234;
index 48027114d75997b4e99a85d7fc655b668393e3d8..425a62be2d5662d30040cc32edcf7eb7a9ce7345 100644 (file)
@@ -10,9 +10,7 @@ package org.opendaylight.controller.sal.binding.test.connect.dom;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-
 import java.util.concurrent.Future;
-
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
@@ -25,10 +23,6 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 
-/**
- * FIXME: Migrate to use new Data Broker APIs
- */
-@SuppressWarnings("deprecation")
 public class BrokerIntegrationTest extends AbstractDataServiceTest {
 
     private static final TopLevelListKey TLL_FOO_KEY = new TopLevelListKey("foo");
index 1be550f82fc6b9672202c8741fce33ec870e5a1a..23e6053a1fb73952b130b23ef401cbe5074c6d94 100644 (file)
@@ -56,7 +56,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-@SuppressWarnings("deprecation")
 public class CrossBrokerMountPointTest {
 
     private static final QName TLL_NAME_QNAME = QName.create(TopLevelList.QNAME, "name");
index a0f4e99a6bb8a8ff6f1cde163d83b945d60fcddd..1e93c59434c64ac291774fa34bd6fce7b798e4ac 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.sal.binding.test.connect.dom;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -97,12 +96,8 @@ public class DOMRpcServiceTestBugfix560 {
 
         assertNotNull(moduleStream);
         final List<InputStream> rpcModels = Collections.singletonList(moduleStream);
-        @SuppressWarnings("deprecation")
-        final
-        Set<Module> modules = parser.parseYangModelsFromStreams(rpcModels);
-        @SuppressWarnings("deprecation")
-        final
-        SchemaContext mountSchemaContext = parser.resolveSchemaContext(modules);
+        final Set<Module> modules = parser.parseYangModelsFromStreams(rpcModels);
+        final SchemaContext mountSchemaContext = parser.resolveSchemaContext(modules);
         schemaContext = mountSchemaContext;
     }
 
@@ -121,7 +116,6 @@ public class DOMRpcServiceTestBugfix560 {
                 .child(TopLevelList.class, new TopLevelListKey(mount)).toInstance();
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void test() throws ExecutionException, InterruptedException {
         // FIXME: This is made to only make sure instance identifier codec
index 3a66aa1181a509feb48305af1c3b079712ff0676..b009fbbdb159daa22616bb748e362313553d6b89 100644 (file)
@@ -20,7 +20,7 @@ public abstract class AbstractConfig implements UnifiedConfig {
         return config;
     }
 
-    public static abstract class Builder<T extends Builder>{
+    public static abstract class Builder<T extends Builder<T>> {
         protected Map<String, Object> configHolder;
         protected Config fallback;
 
index 48afe40607907786ebddd2260746bd18f9a2f9c3..746ef4ebb1385387b2c2167e82d148e8cab82a23 100644 (file)
@@ -89,7 +89,7 @@ public class CommonConfig extends AbstractConfig {
         return cachedMailBoxPushTimeout;
     }
 
-    public static class Builder<T extends Builder> extends AbstractConfig.Builder<T>{
+    public static class Builder<T extends Builder<T>> extends AbstractConfig.Builder<T>{
 
         public Builder(String actorSystemName) {
             super(actorSystemName);
index fc1bd4225da2a1459cf9d19967c864262a834b6c..eac4fc496f1d44bde8dd6f045209b37284c07295 100644 (file)
@@ -467,7 +467,7 @@ public class NormalizedNodeSerializer {
             return builder.build();
         }
 
-        private NormalizedNode<?, ?> buildDataContainer(DataContainerNodeBuilder builder, NormalizedNodeMessages.Node node){
+        private NormalizedNode<?, ?> buildDataContainer(DataContainerNodeBuilder<?, ?> builder, NormalizedNodeMessages.Node node){
 
             for(NormalizedNodeMessages.Node child : node.getChildList()){
                 builder.withChild((DataContainerChild<?, ?>) deSerialize(child));
index 83e10cf6afdf882a9df33e8de10f503536f0bba1..b61b276d5e58a3c040211881ee4dce99f29062fe 100644 (file)
@@ -47,9 +47,9 @@ public class CompositeModificationByteStringPayload extends Payload implements
 
 
     @Override
-    public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+    public Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> encode() {
         Preconditions.checkState(byteString!=null);
-        Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+        Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> map = new HashMap<>();
         map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
                 getModificationInternal());
         return map;
index fe5043e73d3593ac4b2966c8cfac6148d909d0d0..cef20af650e295eeb7c23f7d66ec6d0b8b85ea3d 100644 (file)
@@ -31,9 +31,9 @@ public class CompositeModificationPayload extends Payload implements
         this.modification = (PersistentMessages.CompositeModification) Preconditions.checkNotNull(modification, "modification should not be null");
     }
 
-    @Override public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+    @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> encode() {
         Preconditions.checkState(modification!=null);
-        Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+        Map<GeneratedMessage.GeneratedExtension<?, ?>, PersistentMessages.CompositeModification> map = new HashMap<>();
         map.put(
             org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification, this.modification);
         return map;
index 65b6ac4bd008c95c666ce83a1f0047553104a64a..81449c574780705196e28d3dbd738151c2d948cd 100644 (file)
@@ -45,7 +45,6 @@ import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransacti
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
@@ -59,22 +58,18 @@ import org.opendaylight.controller.cluster.datastore.modification.ModificationPa
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -87,8 +82,6 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class Shard extends RaftActor {
 
-    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
     @VisibleForTesting
@@ -104,10 +97,6 @@ public class Shard extends RaftActor {
 
     private DatastoreContext datastoreContext;
 
-    private SchemaContext schemaContext;
-
-    private int createSnapshotTransactionCounter;
-
     private final ShardCommitCoordinator commitCoordinator;
 
     private long transactionCommitTimeout;
@@ -121,15 +110,11 @@ public class Shard extends RaftActor {
     private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
             Serialization.serializedActorPath(getSelf()));
 
+    private final DOMTransactionFactory domTransactionFactory;
 
-    /**
-     * Coordinates persistence recovery on startup.
-     */
-    private ShardRecoveryCoordinator recoveryCoordinator;
+    private final ShardTransactionActorFactory transactionActorFactory;
 
-    private final DOMTransactionFactory transactionFactory;
-
-    private final String txnDispatcherPath;
+    private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
     private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
@@ -140,9 +125,6 @@ public class Shard extends RaftActor {
 
         this.name = name.toString();
         this.datastoreContext = datastoreContext;
-        this.schemaContext = schemaContext;
-        this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
-                .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
 
         setPersistence(datastoreContext.isPersistent());
 
@@ -164,9 +146,9 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+        domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
 
-        commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+        commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
                 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
@@ -178,7 +160,11 @@ public class Shard extends RaftActor {
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
-        recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
+        transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+                new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+                        Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+
+        snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
     }
 
     private void setTransactionCommitTimeout() {
@@ -563,29 +549,15 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
             ShardTransactionIdentifier transactionId, String transactionChainId,
             short clientVersion ) {
 
-        DOMStoreTransaction transaction = transactionFactory.newTransaction(
-                TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
-                transactionChainId);
-
-        return createShardTransaction(transaction, transactionId, clientVersion);
-    }
-
-    private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
-                                            short clientVersion){
-        return getContext().actorOf(
-                ShardTransaction.props(transaction, getSelf(),
-                        schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId(), clientVersion)
-                        .withDispatcher(txnDispatcherPath),
-                transactionId.toString());
-
+        return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+                transactionId, transactionChainId, clientVersion);
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -617,18 +589,11 @@ public class Shard extends RaftActor {
         return transactionActor;
     }
 
-    private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
-        throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
-    }
-
     private void commitWithNewTransaction(final Modification modification) {
         DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
         modification.apply(tx);
         try {
-            syncCommitTransaction(tx);
+            snapshotCohort.syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
         } catch (InterruptedException | ExecutionException e) {
@@ -638,9 +603,7 @@ public class Shard extends RaftActor {
     }
 
     private void updateSchemaContext(final UpdateSchemaContext message) {
-        this.schemaContext = message.getSchemaContext();
         updateSchemaContext(message.getSchemaContext());
-        store.onGlobalContextUpdated(message.getSchemaContext());
     }
 
     @VisibleForTesting
@@ -654,30 +617,18 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected
-    void startLogRecoveryBatch(final int maxBatchSize) {
-        recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
-    }
-
-    @Override
-    protected void appendRecoveredLogEntry(final Payload data) {
-        recoveryCoordinator.appendRecoveredLogPayload(data);
+    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+        return snapshotCohort;
     }
 
     @Override
-    protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
-    }
-
-    @Override
-    protected void applyCurrentLogRecoveryBatch() {
-        recoveryCoordinator.applyCurrentLogRecoveryBatch();
+    @Nonnull
+    protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+        return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
     }
 
     @Override
     protected void onRecoveryComplete() {
-        recoveryCoordinator = null;
-
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());
 
@@ -732,46 +683,6 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Override
-    protected void createSnapshot() {
-        // Create a transaction actor. We are really going to treat the transaction as a worker
-        // so that this actor does not get block building the snapshot. THe transaction actor will
-        // after processing the CreateSnapshot message.
-
-        ActorRef createSnapshotTransaction = createTransaction(
-                TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "",
-                DataStoreVersions.CURRENT_VERSION);
-
-        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
-    }
-
-    @VisibleForTesting
-    @Override
-    protected void applySnapshot(final byte[] snapshotBytes) {
-        // Since this will be done only on Recovery or when this actor is a Follower
-        // we can safely commit everything in here. We not need to worry about event notifications
-        // as they would have already been disabled on the follower
-
-        LOG.info("{}: Applying snapshot", persistenceId());
-        try {
-            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
-            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
-            // delete everything first
-            transaction.delete(DATASTORE_ROOT);
-
-            // Add everything from the remote node back
-            transaction.write(DATASTORE_ROOT, node);
-            syncCommitTransaction(transaction);
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
-        } finally {
-            LOG.info("{}: Done applying snapshot", persistenceId());
-        }
-    }
-
     @Override
     protected void onStateChanged() {
         boolean isLeader = isLeader();
@@ -786,7 +697,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            transactionFactory.closeAllTransactionChains();
+            domTransactionFactory.closeAllTransactionChains();
         }
     }
 
index 2e66ef918e6e7541592449e9b51405610f4826a6..41ca486eb6cf9c15756b8e5748f75358fb61c409 100644 (file)
@@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -38,9 +37,8 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index b394da88e853f4387fb2cbc70ff3b93ab5436999..2042e955777f6668a831889e5ef4985f31505f12 100644 (file)
@@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -25,9 +24,8 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
index 7e547d7257dc495dc34a796d1c7b7a0944cb0e76..01a124b6977c801e3f273c57341efe91d97c52b2 100644 (file)
@@ -13,6 +13,7 @@ import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -32,7 +33,7 @@ import org.slf4j.Logger;
  *
  * @author Thomas Panetelis
  */
-class ShardRecoveryCoordinator {
+class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
 
     private final InMemoryDOMDataStore store;
     private List<ModificationPayload> currentLogRecoveryBatch;
@@ -45,13 +46,15 @@ class ShardRecoveryCoordinator {
         this.log = log;
     }
 
-    void startLogRecoveryBatch(int maxBatchSize) {
+    @Override
+    public void startLogRecoveryBatch(int maxBatchSize) {
         currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
 
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
     }
 
-    void appendRecoveredLogPayload(Payload payload) {
+    @Override
+    public void appendRecoveredLogEntry(Payload payload) {
         try {
             if(payload instanceof ModificationPayload) {
                 currentLogRecoveryBatch.add((ModificationPayload) payload);
@@ -83,7 +86,8 @@ class ShardRecoveryCoordinator {
     /**
      * Applies the current batched log entries to the data store.
      */
-    void applyCurrentLogRecoveryBatch() {
+    @Override
+    public void applyCurrentLogRecoveryBatch() {
         log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
 
         DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
@@ -105,7 +109,8 @@ class ShardRecoveryCoordinator {
      *
      * @param snapshotBytes the serialized snapshot
      */
-    void applyRecoveredSnapshot(final byte[] snapshotBytes) {
+    @Override
+    public void applyRecoverySnapshot(final byte[] snapshotBytes) {
         log.debug("{}: Applyng recovered sbapshot", shardName);
 
         DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
new file mode 100644 (file)
index 0000000..c59085d
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+
+/**
+ * Participates in raft snapshotting on behalf of a Shard actor.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardSnapshotCohort implements RaftActorSnapshotCohort {
+
+    private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
+
+    private int createSnapshotTransactionCounter;
+    private final ShardTransactionActorFactory transactionActorFactory;
+    private final InMemoryDOMDataStore store;
+    private final Logger log;
+    private final String logId;
+
+    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+            Logger log, String logId) {
+        this.transactionActorFactory = transactionActorFactory;
+        this.store = store;
+        this.log = log;
+        this.logId = logId;
+    }
+
+    @Override
+    public void createSnapshot(ActorRef actorRef) {
+        // Create a transaction actor. We are really going to treat the transaction as a worker
+        // so that this actor does not get block building the snapshot. THe transaction actor will
+        // after processing the CreateSnapshot message.
+
+        ShardTransactionIdentifier transactionID = new ShardTransactionIdentifier(
+                "createSnapshot" + ++createSnapshotTransactionCounter);
+
+        ActorRef createSnapshotTransaction = transactionActorFactory.newShardTransaction(
+                TransactionProxy.TransactionType.READ_ONLY, transactionID, "", DataStoreVersions.CURRENT_VERSION);
+
+        createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, actorRef);
+    }
+
+    @Override
+    public void applySnapshot(byte[] snapshotBytes) {
+        // Since this will be done only on Recovery or when this actor is a Follower
+        // we can safely commit everything in here. We not need to worry about event notifications
+        // as they would have already been disabled on the follower
+
+        log.info("{}: Applying snapshot", logId);
+
+        try {
+            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+
+            NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+
+            // delete everything first
+            transaction.delete(DATASTORE_ROOT);
+
+            // Add everything from the remote node back
+            transaction.write(DATASTORE_ROOT, node);
+            syncCommitTransaction(transaction);
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("{}: An exception occurred when applying snapshot", logId, e);
+        } finally {
+            log.info("{}: Done applying snapshot", logId);
+        }
+
+    }
+
+    void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+            throws ExecutionException, InterruptedException {
+        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        commitCohort.preCommit().get();
+        commitCohort.commit().get();
+    }
+}
index 613b3749e086abc8cdea38fd322872f656235a91..066f01b092d701b08556c5fd7319db4d64a6859c 100644 (file)
@@ -31,7 +31,6 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * The ShardTransaction Actor represents a remote transaction
@@ -54,25 +53,22 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     protected static final boolean SERIALIZED_REPLY = true;
 
     private final ActorRef shardActor;
-    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
     private final short clientTxVersion;
 
-    protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID, short clientTxVersion) {
+    protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
+            short clientTxVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
-        this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
         this.clientTxVersion = clientTxVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID, short txnClientVersion) {
-        return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+            DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
+        return Props.create(new ShardTransactionCreator(transaction, shardActor,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
@@ -86,10 +82,6 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return transactionID;
     }
 
-    protected SchemaContext getSchemaContext() {
-        return schemaContext;
-    }
-
     protected short getClientTxVersion() {
         return clientTxVersion;
     }
@@ -161,19 +153,16 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
 
         final DOMStoreTransaction transaction;
         final ActorRef shardActor;
-        final SchemaContext schemaContext;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
         final short txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
-                SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID, short txnClientVersion) {
+                DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
-            this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
             this.txnClientVersion = txnClientVersion;
@@ -184,13 +173,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardActor, shardStats, transactionID, txnClientVersion);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardStats, transactionID, txnClientVersion);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
+                        shardActor, shardStats, transactionID, txnClientVersion);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index 8ba613958a9a5dfc1e0e2a9b45b921c3b6243b4e..a4c97e8ab9248cd471ad23f50913abf8032a01d3 100644 (file)
@@ -27,14 +27,12 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     private final DOMStoreTransactionChain chain;
     private final DatastoreContext datastoreContext;
-    private final SchemaContext schemaContext;
     private final ShardStats shardStats;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, ShardStats shardStats) {
+    public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+            ShardStats shardStats) {
         this.chain = chain;
         this.datastoreContext = datastoreContext;
-        this.schemaContext = schemaContext;
         this.shardStats = shardStats;
     }
 
@@ -61,22 +59,19 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId(),
+                            datastoreContext, shardStats, createTransaction.getTransactionId(),
                             createTransaction.getVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
@@ -94,8 +89,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
         DatastoreContext datastoreContext, ShardStats shardStats) {
-        return Props.create(new ShardTransactionChainCreator(chain, schemaContext,
-                datastoreContext, shardStats));
+        return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
     }
 
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
@@ -103,21 +97,19 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
         final DOMStoreTransactionChain chain;
         final DatastoreContext datastoreContext;
-        final SchemaContext schemaContext;
         final ShardStats shardStats;
 
 
-        ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
-            DatastoreContext datastoreContext, ShardStats shardStats) {
+        ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+                ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
-            this.schemaContext = schemaContext;
             this.shardStats = shardStats;
         }
 
         @Override
         public ShardTransactionChain create() throws Exception {
-            return new ShardTransactionChain(chain, schemaContext, datastoreContext, shardStats);
+            return new ShardTransactionChain(chain, datastoreContext, shardStats);
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java
new file mode 100644 (file)
index 0000000..9637646
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActorContext;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+
+/**
+ * A factory for creating ShardTransaction actors.
+ *
+ * @author Thomas Pantelis
+ */
+class ShardTransactionActorFactory {
+
+    private final DOMTransactionFactory domTransactionFactory;
+    private final DatastoreContext datastoreContext;
+    private final String txnDispatcherPath;
+    private final ShardStats shardMBean;
+    private final UntypedActorContext actorContext;
+    private final ActorRef shardActor;
+
+    ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+            String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
+        this.domTransactionFactory = domTransactionFactory;
+        this.datastoreContext = datastoreContext;
+        this.txnDispatcherPath = txnDispatcherPath;
+        this.shardMBean = shardMBean;
+        this.actorContext = actorContext;
+        this.shardActor = shardActor;
+    }
+
+    ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
+            String transactionChainID, short clientVersion) {
+
+        DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
+                transactionChainID);
+
+        return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+                transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
+                transactionID.toString());
+    }
+}
index 356891164628bdfd8434b620707452ffa89bf7d5..1d5b1d8e1b99bf35f5fabe8ab3723892dd7af193 100644 (file)
@@ -1,6 +1,6 @@
 /*
- *
  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *  Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  *  This program and the accompanying materials are made available under the
  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * @author: syedbahm
@@ -41,12 +40,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardWriteTransaction extends ShardTransaction {
 
     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
+    private int totalBatchedModificationsReceived;
+    private Exception lastBatchedModificationsException;
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
-            short clientTxVersion) {
-        super(shardActor, schemaContext, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats, String transactionID, short clientTxVersion) {
+        super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
@@ -88,13 +88,29 @@ public class ShardWriteTransaction extends ShardTransaction {
                 modification.apply(transaction);
             }
 
+            totalBatchedModificationsReceived++;
             if(batched.isReady()) {
+                if(lastBatchedModificationsException != null) {
+                    throw lastBatchedModificationsException;
+                }
+
+                if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+                    throw new IllegalStateException(String.format(
+                            "The total number of batched messages received %d does not match the number sent %d",
+                            totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+                }
+
                 readyTransaction(transaction, false);
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
         } catch (Exception e) {
+            lastBatchedModificationsException = e;
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+            if(batched.isReady()) {
+                getSelf().tell(PoisonPill.getInstance(), getSelf());
+            }
         }
     }
 
index f34c5a257125026f61f02fcbf18c945160ede2bc..c722918c5cfed8ad7062e63911ae60fa45aad7fa 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -45,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
+    private int totalBatchedModificationsSent;
 
     protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
             String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
@@ -159,6 +161,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             }
 
             batchedModifications.setReady(ready);
+            batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
             if(ready) {
index a9ce94b033b26690b7e49017c36a2e24abbd0f3c..86f96f57d0f3cb0c284a1a902bd820baaac9e82d 100644 (file)
@@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement
     private static final long serialVersionUID = 1L;
 
     private boolean ready;
+    private int totalMessagesSent;
     private String transactionID;
     private String transactionChainID;
 
@@ -42,6 +43,14 @@ public class BatchedModifications extends MutableCompositeModification implement
         this.ready = ready;
     }
 
+    public int getTotalMessagesSent() {
+        return totalMessagesSent;
+    }
+
+    public void setTotalMessagesSent(int totalMessagesSent) {
+        this.totalMessagesSent = totalMessagesSent;
+    }
+
     public String getTransactionID() {
         return transactionID;
     }
@@ -56,6 +65,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         transactionID = in.readUTF();
         transactionChainID = in.readUTF();
         ready = in.readBoolean();
+        totalMessagesSent = in.readInt();
     }
 
     @Override
@@ -64,6 +74,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         out.writeUTF(transactionID);
         out.writeUTF(transactionChainID);
         out.writeBoolean(ready);
+        out.writeInt(totalMessagesSent);
     }
 
     @Override
@@ -74,8 +85,10 @@ public class BatchedModifications extends MutableCompositeModification implement
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
-                .append(", modifications size=").append(getModifications().size()).append("]");
+        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
+                .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
+                .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
+                .append("]");
         return builder.toString();
     }
 }
index 2e391570c41afb8ee5751f19ce2c22dcff3a6c26..01d11288d214c3b0f583cddb4e7ead2915d684f8 100644 (file)
@@ -66,7 +66,6 @@ public class ModificationPayload extends Payload implements Externalizable {
         out.write(serializedPayload);
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     @Deprecated
     public <T> Map<GeneratedExtension, T> encode() {
index 74e61c189f828bb9228152ada1dedae2b16c53e3..d90cf500b02a0ec0a16201ec3774ddc227a1b57b 100644 (file)
@@ -51,7 +51,7 @@ public class MessageTracker {
 
     private static final Context NO_OP_CONTEXT = new NoOpContext();
 
-    private final Class expectedMessageClass;
+    private final Class<?> expectedMessageClass;
 
     private final long expectedArrivalInterval;
 
@@ -73,7 +73,7 @@ public class MessageTracker {
      * @param expectedArrivalIntervalInMillis The expected arrival interval between two instances of the expected
      *                                        message
      */
-    public MessageTracker(Class expectedMessageClass, long expectedArrivalIntervalInMillis){
+    public MessageTracker(Class<?> expectedMessageClass, long expectedArrivalIntervalInMillis){
         this.expectedMessageClass = expectedMessageClass;
         this.expectedArrivalInterval = expectedArrivalIntervalInMillis;
     }
@@ -120,10 +120,10 @@ public class MessageTracker {
     }
 
     public static class MessageProcessingTime {
-        private final Class messageClass;
+        private final Class<?> messageClass;
         private final long elapsedTimeInNanos;
 
-        MessageProcessingTime(Class messageClass, long elapsedTimeInNanos){
+        MessageProcessingTime(Class<?> messageClass, long elapsedTimeInNanos){
             this.messageClass = messageClass;
             this.elapsedTimeInNanos = elapsedTimeInNanos;
         }
@@ -136,7 +136,7 @@ public class MessageTracker {
                     '}';
         }
 
-        public Class getMessageClass() {
+        public Class<?> getMessageClass() {
             return messageClass;
         }
 
index b3a0430f9325ae933c355edc4a18e304fa8dae05..e3b82df1743e75c433cec193d54a2cbfbd696319 100644 (file)
@@ -17,6 +17,7 @@ import akka.dispatch.Dispatchers;
 import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Function;
@@ -1410,6 +1411,8 @@ public class ShardTest extends AbstractShardTest {
     @SuppressWarnings("serial")
     public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
 
+        final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
         final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
         class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
             TestPersistentDataProvider(DataPersistenceProvider delegate) {
@@ -1426,8 +1429,6 @@ public class ShardTest extends AbstractShardTest {
         dataStoreContextBuilder.persistent(persistent);
 
         new ShardTestKit(getSystem()) {{
-            final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
             class TestShard extends Shard {
 
                 protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
@@ -1437,9 +1438,12 @@ public class ShardTest extends AbstractShardTest {
                 }
 
                 @Override
-                protected void commitSnapshot(final long sequenceNumber) {
-                    super.commitSnapshot(sequenceNumber);
-                    latch.get().countDown();
+                public void handleCommand(Object message) {
+                    super.handleCommand(message);
+
+                    if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+                        latch.get().countDown();
+                    }
                 }
 
                 @Override
index c3fef611e348a2a446ef2bfff87024425063f3ba..21fb55fcf138bd8e10cd54b2488079770e055e63 100644 (file)
@@ -70,8 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -100,8 +99,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -130,8 +128,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -160,8 +157,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -193,8 +189,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -231,8 +226,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -264,8 +258,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
index 9715f668e353fe71d527865d2ffd68a4759952b5..c9335f378a8662e7b7ceb488961fb49588d50baf 100644 (file)
@@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
@@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -97,7 +100,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
             short version) {
         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
-                testSchemaContext, datastoreContext, shardStats, "txn", version);
+                datastoreContext, shardStats, "txn", version);
         return getSystem().actorOf(props, name);
     }
 
@@ -424,16 +427,83 @@ public class ShardTransactionTest extends AbstractActorTest {
                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
-            batched.setReady(true);
             batched.addModification(new WriteModification(writePath, writeData));
 
             transaction.tell(batched, getRef());
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
 
+            transaction.tell(batched, getRef());
             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
     }
 
+    @Test(expected=TestException.class)
+    public void testOnReceiveBatchedModificationsFailure() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx,
+                    "testOnReceiveBatchedModificationsFailure");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+            doThrow(new TestException()).when(mockWriteTx).write(path, node);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.addModification(new WriteModification(path, node));
+
+            transaction.tell(batched, getRef());
+            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
     @Test
     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -541,8 +611,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn",
-                DataStoreVersions.CURRENT_VERSION);
+                datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
@@ -564,4 +633,8 @@ public class ShardTransactionTest extends AbstractActorTest {
             expectMsgClass(duration("3 seconds"), Terminated.class);
         }};
     }
+
+    public static class TestException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+    }
 }
index 6cfef194915bb81230b6b8b87c0809afcd15eb2e..cc9692bfd91b72f3245f2bb6bd160f12551720b5 100644 (file)
@@ -476,8 +476,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(BatchedModifications.class));
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), true,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
     }
 
     @Test
@@ -1221,6 +1226,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
+
+        assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
     }
 
     @Test
index b302f527d6eb7019b68df59bac8bed656edb6127..1df8e9775b89219439f31cbafbec69e675a353d8 100644 (file)
@@ -46,6 +46,7 @@ public class BatchedModificationsTest {
         batched.addModification(new MergeModification(mergePath, mergeData));
         batched.addModification(new DeleteModification(deletePath));
         batched.setReady(true);
+        batched.setTotalMessagesSent(5);
 
         BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
                 (Serializable) batched.toSerializable());
@@ -54,6 +55,7 @@ public class BatchedModificationsTest {
         assertEquals("getTransactionID", "tx1", clone.getTransactionID());
         assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
         assertEquals("isReady", true, clone.isReady());
+        assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
 
         assertEquals("getModifications size", 3, clone.getModifications().size());
 
index d62c9dbc28713eb5cb35bb434c9bf8e7b733e2c4..18c2985635ffab5193d50b069999f4089e379aeb 100644 (file)
@@ -37,7 +37,7 @@ public class MessageCollectorActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         if(message instanceof String){
             if("messages".equals(message)){
-                getSender().tell(new ArrayList(messages), getSelf());
+                getSender().tell(new ArrayList<>(messages), getSelf());
             }
         } else {
             messages.add(message);
index 1dfc607e6f069ae4d62878cb329d25aa0260b116..611303a41adcfede958c88fb1bb1d8d32304c129 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.md.sal.dom.broker.impl;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
@@ -16,13 +15,17 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
  * A {@link Future} used to report the status of an future {@link java.util.concurrent.Future}.
  */
 final class PingPongFuture extends AbstractCheckedFuture<Void, TransactionCommitFailedException> {
-    protected PingPongFuture(final ListenableFuture<Void> delegate) {
-        super(delegate);
-    }
+  protected PingPongFuture(final ListenableFuture<Void> delegate) {
+    super(delegate);
+  }
 
-    @Override
-    protected TransactionCommitFailedException mapException(final Exception e) {
-        Preconditions.checkArgument(e instanceof TransactionCommitFailedException);
-        return (TransactionCommitFailedException) e;
+  @Override
+  protected TransactionCommitFailedException mapException(final Exception e) {
+    if (e.getCause() instanceof TransactionCommitFailedException){
+      return (TransactionCommitFailedException) e.getCause();
+    } else {
+      return new TransactionCommitFailedException(e.getMessage(), e.getCause(), null);
     }
+  }
 }
+