From ca1c2c54a6bd4a74c0c56480c5f11d86509fb17b Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Sat, 13 Sep 2014 11:47:44 -0700 Subject: [PATCH] BUG 1853 : Clustered Data Store causes Out of Memory createSnapshotTransaction was not being set to null which caused snapshots from being created in the future and thus did not compact the memory ultimately leading to the JVM running out of memory Switched to using an InMemorySnapshotstore for test to avoid having to deal with the file system and adding unneccessary sleeps Change-Id: I0cb4b219a09a9bc736bd47533888999ee543a4e5 Signed-off-by: Moiz Raja --- .../cluster/raft/DefaultConfigParamsImpl.java | 2 +- .../controller/cluster/raft/RaftActor.java | 15 +-- .../controller/cluster/datastore/Shard.java | 5 + .../cluster/datastore/AbstractActorTest.java | 4 + .../cluster/datastore/ShardTest.java | 9 +- .../utils/InMemorySnapshotStore.java | 110 ++++++++++++++++++ .../src/test/resources/application.conf | 10 ++ 7 files changed, 145 insertions(+), 10 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 75c237f503..9d06f63604 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit; */ public class DefaultConfigParamsImpl implements ConfigParams { - private static final int SNAPSHOT_BATCH_COUNT = 100000; + private static final int SNAPSHOT_BATCH_COUNT = 20000; /** * The maximum election time variance diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 778f5c68f6..190f1bd409 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.debug("SnapshotOffer called.."); + LOG.info("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -135,10 +135,11 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); - LOG.debug("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", + LOG.info("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size()); + replicatedLog.size() + ); // Apply the snapshot to the actors state applySnapshot(ByteString.copyFrom(snapshot.getState())); @@ -236,17 +237,17 @@ public abstract class RaftActor extends UntypedPersistentActor { context.removePeer(rrp.getName()); } else if (message instanceof CaptureSnapshot) { - LOG.debug("CaptureSnapshot received by actor"); + LOG.info("CaptureSnapshot received by actor"); CaptureSnapshot cs = (CaptureSnapshot)message; captureSnapshot = cs; createSnapshot(); } else if (message instanceof CaptureSnapshotReply){ - LOG.debug("CaptureSnapshotReply received by actor"); + LOG.info("CaptureSnapshotReply received by actor"); CaptureSnapshotReply csr = (CaptureSnapshotReply) message; ByteString stateInBytes = csr.getSnapshot(); - LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); handleCaptureSnapshotReply(stateInBytes); } else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 713996b13b..bf1eb056b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -193,6 +193,7 @@ public class Shard extends RaftActor { .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self()); + createSnapshotTransaction = null; // Send a PoisonPill instead of sending close transaction because we do not really need // a response getSender().tell(PoisonPill.getInstance(), self()); @@ -503,6 +504,8 @@ public class Shard extends RaftActor { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower + + LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); @@ -517,6 +520,8 @@ public class Shard extends RaftActor { syncCommitTransaction(transaction); } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); + } finally { + LOG.info("Done applying snapshot"); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java index 4c550a768c..022ef9bbaf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -25,12 +25,16 @@ public abstract class AbstractActorTest { System.setProperty("shard.persistent", "false"); system = ActorSystem.create("test"); + + deletePersistenceFiles(); } @AfterClass public static void tearDownClass() throws IOException { JavaTestKit.shutdownActorSystem(system); system = null; + + deletePersistenceFiles(); } protected static void deletePersistenceFiles() throws IOException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 06bcac8d78..deb71c2df4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -343,11 +343,16 @@ public class ShardTest extends AbstractActorTest { subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); - waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor"); + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + + subject.tell(new CaptureSnapshot(-1,-1,-1,-1), + getRef()); + + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + } }; - Thread.sleep(2000); deletePersistenceFiles(); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java new file mode 100644 index 0000000000..0e492f0fbb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.dispatch.Futures; +import akka.japi.Option; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.snapshot.japi.SnapshotStore; +import com.google.common.collect.Iterables; +import scala.concurrent.Future; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InMemorySnapshotStore extends SnapshotStore { + + Map> snapshots = new HashMap<>(); + + @Override public Future> doLoadAsync(String s, + SnapshotSelectionCriteria snapshotSelectionCriteria) { + List snapshotList = snapshots.get(s); + if(snapshotList == null){ + return Futures.successful(Option.none()); + } + + Snapshot snapshot = Iterables.getLast(snapshotList); + SelectedSnapshot selectedSnapshot = + new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData()); + return Futures.successful(Option.some(selectedSnapshot)); + } + + @Override public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + + if(snapshotList == null){ + snapshotList = new ArrayList<>(); + snapshots.put(snapshotMetadata.persistenceId(), snapshotList); + } + snapshotList.add(new Snapshot(snapshotMetadata, o)); + + return Futures.successful(null); + } + + @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + } + + @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + + if(snapshotList == null){ + return; + } + + int deleteIndex = -1; + + for(int i=0;i snapshotList = snapshots.get(s); + + if(snapshotList == null){ + return; + } + + // TODO : This is a quick and dirty implementation. Do actual match later. + snapshotList.clear(); + snapshots.remove(s); + } + + private static class Snapshot { + private final SnapshotMetadata metadata; + private final Object data; + + private Snapshot(SnapshotMetadata metadata, Object data) { + this.metadata = metadata; + this.data = data; + } + + public SnapshotMetadata getMetadata() { + return metadata; + } + + public Object getData() { + return data; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 794b376af8..f0dadc618b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -1,4 +1,6 @@ akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] actor { @@ -14,6 +16,14 @@ akka { } } } + +in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +} + bounded-mailbox { mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000 -- 2.36.6