BUG 2437 - Enable snapshotting based on size of data in the in-memory journal 48/13148/4
authorMoiz Raja <moraja@cisco.com>
Wed, 26 Nov 2014 17:07:40 +0000 (09:07 -0800)
committerMoiz Raja <moraja@cisco.com>
Mon, 1 Dec 2014 18:38:53 +0000 (10:38 -0800)
- Changed RaftActor to snapshot based on the transaction count or the size of the
  data in the in-memory journal whichever comes earlier
- The size of data that is used is the rough (not-accurate) size of the Payload
  in the Replicated log entry
- In ShardStats exposed another property which is the data size of the in-memory
  journal
- The snapshot data threshold percentage is configurable using the config sub-system and is
  set to a default of 12%. The reason for setting it at 12% by default is because we have
  a total of 8 default shards out of the box. I could have set this to 16% as toaster is not
  a "real" data shard.
- The snapshot data threshold is calculated as a percentage of the Runtime.totalMemory()
  which is the total memory the jvm considers available for object allocation. From testing
  it appears that the total memory is what would appear in jconsole as the committed memory.

I have not added any unit testing for this - but tested this using the scenario described in
bug 2437 and it seems to work pretty well. The deployment used only 2G of memory and worked
fine for a 7 switch topology and I observed that it had not run out of memory after more than
2 hours.

Change-Id: I09ec0827c0411c42a9224bb6d159d5590c22e20b
Signed-off-by: Moiz Raja <moraja@cisco.com>
19 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java

index e0873cc..d2862c2 100644 (file)
@@ -9,13 +9,12 @@
 package org.opendaylight.controller.cluster.example.messages;
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 public class KeyValue extends Payload implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -71,4 +70,9 @@ public class KeyValue extends Payload implements Serializable {
         return this;
     }
 
+    @Override
+    public int size() {
+        return this.value.length() + this.key.length();
+    }
+
 }
index a2c9d66..653520c 100644 (file)
@@ -26,6 +26,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
+    protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
@@ -198,6 +199,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
+        dataSize = 0;
     }
 
     @Override
index 433c3f7..4245cf1 100644 (file)
@@ -28,6 +28,14 @@ public interface ConfigParams {
      */
     long getSnapshotBatchCount();
 
+    /**
+     * The percentage of total memory in the in-memory Raft log before a snapshot
+     * is to be taken
+     *
+     * @return int
+     */
+    int getSnapshotDataThresholdPercentage();
+
     /**
      * The interval at which a heart beat message will be sent to the remote
      * RaftActor
index a209223..3a6bdbf 100644 (file)
@@ -7,9 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Default implementation of the ConfigParams
@@ -47,6 +46,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration isolatedLeaderCheckInterval =
         new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
 
+    // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+    // in-memory journal can use before it needs to snapshot
+    private int snapshotDataThresholdPercentage = 12;
+
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
     }
@@ -55,6 +58,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
+    public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+        this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+    }
+
     public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
@@ -68,6 +75,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return snapshotBatchCount;
     }
 
+    @Override
+    public int getSnapshotDataThresholdPercentage() {
+        return snapshotDataThresholdPercentage;
+    }
+
+
     @Override
     public FiniteDuration getHeartBeatInterval() {
         return heartBeatInterval;
index d647475..3b84692 100644 (file)
@@ -676,6 +676,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 @Override public void apply(DeleteEntries param)
                     throws Exception {
                     //FIXME : Doing nothing for now
+                    dataSize = 0;
+                    for(ReplicatedLogEntry entry : journal){
+                        dataSize += entry.size();
+                    }
                 }
             });
         }
@@ -685,6 +689,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(null, null, replicatedLogEntry);
         }
 
+        @Override
+        public int dataSize() {
+            return dataSize;
+        }
+
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
@@ -705,9 +714,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
+                        dataSize += replicatedLogEntry.size();
+
+                        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
-                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+                                ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+                                        dataSize > dataThreshold)) {
 
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
index 7ee8532..80b7ad9 100644 (file)
@@ -171,4 +171,9 @@ public interface ReplicatedLog {
      * Restores the replicated log to a state in the event of a save snapshot failure
      */
     public void snapshotRollback();
+
+    /**
+     * Size of the data in the log (in bytes)
+     */
+    public int dataSize();
 }
index f501c4d..1979609 100644 (file)
@@ -34,4 +34,13 @@ public interface ReplicatedLogEntry {
      * @return
      */
     long getIndex();
+
+    /**
+     * The size of the entry in bytes.
+     *
+     * An approximate number may be good enough.
+     *
+     * @return
+     */
+    int size();
 }
index ceb5633..986c7f3 100644 (file)
@@ -8,9 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
     Serializable {
@@ -39,6 +38,11 @@ public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
         return index;
     }
 
+    @Override
+    public int size() {
+        return getData().size();
+    }
+
     @Override public String toString() {
         return "Entry{" +
             "index=" + index +
index 398a2e9..d95c9d5 100644 (file)
@@ -164,6 +164,11 @@ public class AbstractReplicatedLogImplTest {
             this.snapshotTerm = snapshotTerm;
         }
 
+        @Override
+        public int dataSize() {
+            return -1;
+        }
+
         public List<ReplicatedLogEntry> getEntriesTill(final int index) {
             return journal.subList(0, index);
         }
index 562ca21..2424d4d 100644 (file)
@@ -16,13 +16,12 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
 
 public class MockRaftActorContext implements RaftActorContext {
 
@@ -192,6 +191,11 @@ public class MockRaftActorContext implements RaftActorContext {
             append(replicatedLogEntry);
         }
 
+        @Override
+        public int dataSize() {
+            return -1;
+        }
+
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
@@ -222,6 +226,11 @@ public class MockRaftActorContext implements RaftActorContext {
             return this;
         }
 
+        @Override
+        public int size() {
+            return value.length();
+        }
+
         @Override public String getClientPayloadClassName() {
             return MockPayload.class.getName();
         }
@@ -256,6 +265,11 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public long getIndex() {
             return index;
         }
+
+        @Override
+        public int size() {
+            return getData().size();
+        }
     }
 
     public static class MockReplicatedLogBuilder {
index aadc362..075c607 100644 (file)
@@ -12,12 +12,11 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.UnknownFieldSet;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 
 public class CompositeModificationPayload extends Payload implements
     Serializable {
@@ -73,4 +72,8 @@ public class CompositeModificationPayload extends Payload implements
     public Object getModification(){
         return this.modification;
     }
+
+    public int size(){
+        return this.modification.getSerializedSize();
+    }
 }
index 502c338..7df5308 100644 (file)
@@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
 
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 /**
  * An instance of a Payload class is meant to be used as the Payload for
@@ -81,6 +80,8 @@ public abstract class Payload {
     public abstract Payload decode(
         AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
 
+    public abstract int size();
+
 
 
 }
index e18c00e..daba3fd 100644 (file)
@@ -8,17 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-import akka.util.Timeout;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * Contains contextual data for a data store.
  *
@@ -120,6 +119,7 @@ public class DatastoreContext {
         private boolean persistent = true;
         private ConfigurationReader configurationReader = new FileConfigurationReader();
         private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
+        private int shardSnapshotDataThresholdPercentage = 12;
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -156,6 +156,12 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+            this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+            return this;
+        }
+
+
         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
             this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
             return this;
@@ -191,12 +197,14 @@ public class DatastoreContext {
             return this;
         }
 
+
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
             raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                     TimeUnit.MILLISECONDS));
             raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+            raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             raftConfig.setIsolatedLeaderCheckInterval(
                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
 
index 7073ea7..af16d02 100644 (file)
@@ -788,6 +788,7 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
+        shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
     }
 
     @Override
index 9decd82..4fc2ed2 100644 (file)
@@ -62,6 +62,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
 
+    private volatile long dataSize = 0;
+
     private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
@@ -218,6 +220,15 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
+    public void setDataSize(long dataSize){
+        this.dataSize = dataSize;
+    }
+
+    @Override
+    public long getDataSize(){
+        return dataSize;
+    }
+
     @Override
     public ThreadExecutorStats getDataStoreExecutorStats() {
         // FIXME: this particular thing does not work, as it really is DS-specific
index 4d3d438..367d4f4 100644 (file)
@@ -54,6 +54,12 @@ module distributed-datastore-provider {
         }
     }
 
+    typedef percentage {
+        type uint8 {
+            range "0..100";
+        }
+    }
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
@@ -88,9 +94,16 @@ module distributed-datastore-provider {
          leaf shard-snapshot-batch-count {
             default 20000;
             type non-zero-uint32-type;
-            description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+            description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
          }
 
+         leaf shard-snapshot-data-threshold-percentage {
+            default 12;
+            type percentage;
+            description "The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken";
+         }
+
+
          leaf shard-hearbeat-interval-in-millis {
             default 500;
             type heartbeat-interval-type;
index 04d889f..9e02223 100644 (file)
@@ -1,5 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -13,13 +19,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class CompositeModificationPayloadTest {
 
 
@@ -60,6 +59,11 @@ public class CompositeModificationPayloadTest {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         AppendEntries appendEntries =
index a2b78c6..a3041e8 100644 (file)
@@ -13,19 +13,18 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import java.util.ArrayList;
+import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class Client {
 
     private static ActorSystem actorSystem;
@@ -93,6 +92,11 @@ public class Client {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         return new AppendEntries(1, "member-1", 0, 100, modification, 1);
@@ -113,6 +117,11 @@ public class Client {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         return new AppendEntries(1, "member-1", 0, 100, modification, 1);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.