From: Tom Pantelis Date: Mon, 15 Sep 2014 16:36:26 +0000 (+0000) Subject: Merge "BUG 1853 : Clustered Data Store causes Out of Memory" X-Git-Tag: release/helium~72 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=d313c1a52799705222817dfcaeb2b6ab2a6f9146;hp=4b11caf923b277834b3e910c0acac41fca0b9869 Merge "BUG 1853 : Clustered Data Store causes Out of Memory" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java index 75c237f503..9d06f63604 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit; */ public class DefaultConfigParamsImpl implements ConfigParams { - private static final int SNAPSHOT_BATCH_COUNT = 100000; + private static final int SNAPSHOT_BATCH_COUNT = 20000; /** * The maximum election time variance diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 778f5c68f6..190f1bd409 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -123,7 +123,7 @@ public abstract class RaftActor extends UntypedPersistentActor { @Override public void onReceiveRecover(Object message) { if (message instanceof SnapshotOffer) { - LOG.debug("SnapshotOffer called.."); + LOG.info("SnapshotOffer called.."); SnapshotOffer offer = (SnapshotOffer) message; Snapshot snapshot = (Snapshot) offer.snapshot(); @@ -135,10 +135,11 @@ public abstract class RaftActor extends UntypedPersistentActor { context.setReplicatedLog(replicatedLog); context.setLastApplied(snapshot.getLastAppliedIndex()); - LOG.debug("Applied snapshot to replicatedLog. " + - "snapshotIndex={}, snapshotTerm={}, journal-size={}", + LOG.info("Applied snapshot to replicatedLog. " + + "snapshotIndex={}, snapshotTerm={}, journal-size={}", replicatedLog.snapshotIndex, replicatedLog.snapshotTerm, - replicatedLog.size()); + replicatedLog.size() + ); // Apply the snapshot to the actors state applySnapshot(ByteString.copyFrom(snapshot.getState())); @@ -236,17 +237,17 @@ public abstract class RaftActor extends UntypedPersistentActor { context.removePeer(rrp.getName()); } else if (message instanceof CaptureSnapshot) { - LOG.debug("CaptureSnapshot received by actor"); + LOG.info("CaptureSnapshot received by actor"); CaptureSnapshot cs = (CaptureSnapshot)message; captureSnapshot = cs; createSnapshot(); } else if (message instanceof CaptureSnapshotReply){ - LOG.debug("CaptureSnapshotReply received by actor"); + LOG.info("CaptureSnapshotReply received by actor"); CaptureSnapshotReply csr = (CaptureSnapshotReply) message; ByteString stateInBytes = csr.getSnapshot(); - LOG.debug("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); + LOG.info("CaptureSnapshotReply stateInBytes size:{}", stateInBytes.size()); handleCaptureSnapshotReply(stateInBytes); } else { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 713996b13b..bf1eb056b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -193,6 +193,7 @@ public class Shard extends RaftActor { .tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)), self()); + createSnapshotTransaction = null; // Send a PoisonPill instead of sending close transaction because we do not really need // a response getSender().tell(PoisonPill.getInstance(), self()); @@ -503,6 +504,8 @@ public class Shard extends RaftActor { // Since this will be done only on Recovery or when this actor is a Follower // we can safely commit everything in here. We not need to worry about event notifications // as they would have already been disabled on the follower + + LOG.info("Applying snapshot"); try { DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction(); NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(snapshot); @@ -517,6 +520,8 @@ public class Shard extends RaftActor { syncCommitTransaction(transaction); } catch (InvalidProtocolBufferException | InterruptedException | ExecutionException e) { LOG.error(e, "An exception occurred when applying snapshot"); + } finally { + LOG.info("Done applying snapshot"); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java index 4c550a768c..022ef9bbaf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractActorTest.java @@ -25,12 +25,16 @@ public abstract class AbstractActorTest { System.setProperty("shard.persistent", "false"); system = ActorSystem.create("test"); + + deletePersistenceFiles(); } @AfterClass public static void tearDownClass() throws IOException { JavaTestKit.shutdownActorSystem(system); system = null; + + deletePersistenceFiles(); } protected static void deletePersistenceFiles() throws IOException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 06bcac8d78..deb71c2df4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -343,11 +343,16 @@ public class ShardTest extends AbstractActorTest { subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef()); - waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor"); + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + + subject.tell(new CaptureSnapshot(-1,-1,-1,-1), + getRef()); + + waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor"); + } }; - Thread.sleep(2000); deletePersistenceFiles(); }}; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java new file mode 100644 index 0000000000..0e492f0fbb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/InMemorySnapshotStore.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.utils; + +import akka.dispatch.Futures; +import akka.japi.Option; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; +import akka.persistence.snapshot.japi.SnapshotStore; +import com.google.common.collect.Iterables; +import scala.concurrent.Future; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InMemorySnapshotStore extends SnapshotStore { + + Map> snapshots = new HashMap<>(); + + @Override public Future> doLoadAsync(String s, + SnapshotSelectionCriteria snapshotSelectionCriteria) { + List snapshotList = snapshots.get(s); + if(snapshotList == null){ + return Futures.successful(Option.none()); + } + + Snapshot snapshot = Iterables.getLast(snapshotList); + SelectedSnapshot selectedSnapshot = + new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData()); + return Futures.successful(Option.some(selectedSnapshot)); + } + + @Override public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + + if(snapshotList == null){ + snapshotList = new ArrayList<>(); + snapshots.put(snapshotMetadata.persistenceId(), snapshotList); + } + snapshotList.add(new Snapshot(snapshotMetadata, o)); + + return Futures.successful(null); + } + + @Override public void onSaved(SnapshotMetadata snapshotMetadata) throws Exception { + } + + @Override public void doDelete(SnapshotMetadata snapshotMetadata) throws Exception { + List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + + if(snapshotList == null){ + return; + } + + int deleteIndex = -1; + + for(int i=0;i snapshotList = snapshots.get(s); + + if(snapshotList == null){ + return; + } + + // TODO : This is a quick and dirty implementation. Do actual match later. + snapshotList.clear(); + snapshots.remove(s); + } + + private static class Snapshot { + private final SnapshotMetadata metadata; + private final Object data; + + private Snapshot(SnapshotMetadata metadata, Object data) { + this.metadata = metadata; + this.data = data; + } + + public SnapshotMetadata getMetadata() { + return metadata; + } + + public Object getData() { + return data; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 794b376af8..f0dadc618b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -1,4 +1,6 @@ akka { + persistence.snapshot-store.plugin = "in-memory-snapshot-store" + loggers = ["akka.testkit.TestEventListener", "akka.event.slf4j.Slf4jLogger"] actor { @@ -14,6 +16,14 @@ akka { } } } + +in-memory-snapshot-store { + # Class name of the plugin. + class = "org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +} + bounded-mailbox { mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox" mailbox-capacity = 1000