Merge "BUG 1853 : Clustered Data Store causes Out of Memory"
authorTom Pantelis <tpanteli@brocade.com>
Mon, 15 Sep 2014 16:36:26 +0000 (16:36 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 15 Sep 2014 16:36:26 +0000 (16:36 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf

index 75c237f..9d06f63 100644 (file)
@@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class DefaultConfigParamsImpl implements ConfigParams {
 
-    private static final int SNAPSHOT_BATCH_COUNT = 100000;
+    private static final int SNAPSHOT_BATCH_COUNT = 20000;
 
     /**
      * The maximum election time variance
index 778f5c6..190f1bd 100644 (file)
@@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor {
 
     @Override public void onReceiveRecover(Object message) {
         if (message instanceof SnapshotOffer) {
-            LOG.debug("SnapshotOffer called..");
+            LOG.info("SnapshotOffer called..");
             SnapshotOffer offer = (SnapshotOffer) message;
             Snapshot snapshot = (Snapshot) offer.snapshot();
 
@@ -135,10 +135,11 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.setReplicatedLog(replicatedLog);
             context.setLastApplied(snapshot.getLastAppliedIndex());
 
-            LOG.debug("Applied snapshot to replicatedLog. " +
-                "snapshotIndex={}, snapshotTerm={}, journal-size={}",
+            LOG.info("Applied snapshot to replicatedLog. " +
+                    "snapshotIndex={}, snapshotTerm={}, journal-size={}",
                 replicatedLog.snapshotIndex, replicatedLog.snapshotTerm,
-                replicatedLog.size());
+                replicatedLog.size()
+            );
 
             // Apply the snapshot to the actors state
             applySnapshot(ByteString.copyFrom(snapshot.getState()));
@@ -236,17 +237,17 @@ public abstract class RaftActor extends UntypedPersistentActor {
             context.removePeer(rrp.getName());
 
         } else if (message instanceof CaptureSnapshot) {
-            LOG.debug("CaptureSnapshot received by actor");
+            LOG.info("CaptureSnapshot received by actor");
             CaptureSnapshot cs = (CaptureSnapshot)message;
             captureSnapshot = cs;
             createSnapshot();
 
         } else if (message instanceof CaptureSnapshotReply){
-            LOG.debug("CaptureSnapshotReply received by actor");
+            LOG.info("CaptureSnapshotReply received by actor");
             CaptureSnapshotReply csr = (CaptureSnapshotReply) message;
 
             ByteString stateInBytes = csr.getSnapshot();
-            LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
+            LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size());
             handleCaptureSnapshotReply(stateInBytes);
 
         } else {
index 713996b..bf1eb05 100644 (file)
@@ -193,6 +193,7 @@ public class Shard extends RaftActor {
                 .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
                     self());
 
+            createSnapshotTransaction = null;
             // Send a PoisonPill instead of sending close transaction because we do not really need
             // a response
             getSender().tell(PoisonPill.getInstance(), self());
@@ -503,6 +504,8 @@ public class Shard extends RaftActor {
         // Since this will be done only on Recovery or when this actor is a Follower
         // we can safely commit everything in here. We not need to worry about event notifications
         // as they would have already been disabled on the follower
+
+        LOG.info("Applying snapshot");
         try {
             DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
             NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot);
@@ -517,6 +520,8 @@ public class Shard extends RaftActor {
             syncCommitTransaction(transaction);
         } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred when applying snapshot");
+        } finally {
+            LOG.info("Done applying snapshot");
         }
     }
 
index 4c550a7..022ef9b 100644 (file)
@@ -25,12 +25,16 @@ public abstract class AbstractActorTest {
 
         System.setProperty("shard.persistent", "false");
         system = ActorSystem.create("test");
+
+        deletePersistenceFiles();
     }
 
     @AfterClass
     public static void tearDownClass() throws IOException {
         JavaTestKit.shutdownActorSystem(system);
         system = null;
+
+        deletePersistenceFiles();
     }
 
     protected static void deletePersistenceFiles() throws IOException {
index 06bcac8..deb71c2 100644 (file)
@@ -343,11 +343,16 @@ public class ShardTest extends AbstractActorTest {
                     subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
                         getRef());
 
-                    waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
+                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
+                    subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
+                        getRef());
+
+                    waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
+
                 }
             };
 
-            Thread.sleep(2000);
             deletePersistenceFiles();
         }};
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java
new file mode 100644 (file)
index 0000000..0e492f0
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.dispatch.Futures;
+import akka.japi.Option;
+import akka.persistence.SelectedSnapshot;
+import akka.persistence.SnapshotMetadata;
+import akka.persistence.SnapshotSelectionCriteria;
+import akka.persistence.snapshot.japi.SnapshotStore;
+import com.google.common.collect.Iterables;
+import scala.concurrent.Future;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InMemorySnapshotStore extends SnapshotStore {
+
+    Map<String, List<Snapshot>> snapshots = new HashMap<>();
+
+    @Override public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
+        SnapshotSelectionCriteria snapshotSelectionCriteria) {
+        List<Snapshot> snapshotList = snapshots.get(s);
+        if(snapshotList == null){
+            return Futures.successful(Option.<SelectedSnapshot>none());
+        }
+
+        Snapshot snapshot = Iterables.getLast(snapshotList);
+        SelectedSnapshot selectedSnapshot =
+            new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
+        return Futures.successful(Option.some(selectedSnapshot));
+    }
+
+    @Override public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
+        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+        if(snapshotList == null){
+            snapshotList = new ArrayList<>();
+            snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
+        }
+        snapshotList.add(new Snapshot(snapshotMetadata, o));
+
+        return Futures.successful(null);
+    }
+
+    @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception {
+    }
+
+    @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception {
+        List<Snapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+
+        if(snapshotList == null){
+            return;
+        }
+
+        int deleteIndex = -1;
+
+        for(int i=0;i<snapshotList.size(); i++){
+            Snapshot snapshot = snapshotList.get(i);
+            if(snapshotMetadata.equals(snapshot.getMetadata())){
+                deleteIndex = i;
+                break;
+            }
+        }
+
+        if(deleteIndex != -1){
+            snapshotList.remove(deleteIndex);
+        }
+
+    }
+
+    @Override public void doDelete(String s, SnapshotSelectionCriteria snapshotSelectionCriteria)
+        throws Exception {
+        List<Snapshot> snapshotList = snapshots.get(s);
+
+        if(snapshotList == null){
+            return;
+        }
+
+        // TODO : This is a quick and dirty implementation. Do actual match later.
+        snapshotList.clear();
+        snapshots.remove(s);
+    }
+
+    private static class Snapshot {
+        private final SnapshotMetadata metadata;
+        private final Object data;
+
+        private Snapshot(SnapshotMetadata metadata, Object data) {
+            this.metadata = metadata;
+            this.data = data;
+        }
+
+        public SnapshotMetadata getMetadata() {
+            return metadata;
+        }
+
+        public Object getData() {
+            return data;
+        }
+    }
+}
index 794b376..f0dadc6 100644 (file)
@@ -1,4 +1,6 @@
 akka {
+    persistence.snapshot-store.plugin = "in-memory-snapshot-store"
+
     loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"]
 
     actor {
@@ -14,6 +16,14 @@ akka {
         }
     }
 }
+
+in-memory-snapshot-store {
+  # Class name of the plugin.
+  class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore"
+  # Dispatcher for the plugin actor.
+  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
+}
+
 bounded-mailbox {
   mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
   mailbox-capacity = 1000