Merge "BUG 2437 - Enable snapshotting based on size of data in the in-memory journal"
authorTom Pantelis <tpanteli@brocade.com>
Mon, 1 Dec 2014 22:03:00 +0000 (22:03 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 1 Dec 2014 22:03:01 +0000 (22:03 +0000)
19 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/messages/KeyValue.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ConfigParams.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/DefaultConfigParamsImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogEntry.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplEntry.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationPayload.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/Payload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java

index e0873cc7bae06159f5644f5cda370b1ada78275b..d2862c2baf219acedc921cf0710f2978f7fe85f7 100644 (file)
@@ -9,13 +9,12 @@
 package org.opendaylight.controller.cluster.example.messages;
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 public class KeyValue extends Payload implements Serializable {
     private static final long serialVersionUID = 1L;
@@ -71,4 +70,9 @@ public class KeyValue extends Payload implements Serializable {
         return this;
     }
 
+    @Override
+    public int size() {
+        return this.value.length() + this.key.length();
+    }
+
 }
index a2c9d660ad8d2472f7b9a83ab3f175b1883ad1f6..653520c2e47db4be19de53c9e3a42099904298d8 100644 (file)
@@ -26,6 +26,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
     protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
     protected long previousSnapshotIndex = -1;
     protected long previousSnapshotTerm = -1;
+    protected int dataSize = 0;
 
     public AbstractReplicatedLogImpl(long snapshotIndex,
         long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
@@ -198,6 +199,7 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         snapshottedJournal = null;
         previousSnapshotIndex = -1;
         previousSnapshotTerm = -1;
+        dataSize = 0;
     }
 
     @Override
index 433c3f7e4b832d081de375e8b792f19f1ca4caa4..4245cf10f778fc81fe3181b0db5d9b825ea23a4c 100644 (file)
@@ -28,6 +28,14 @@ public interface ConfigParams {
      */
     long getSnapshotBatchCount();
 
+    /**
+     * The percentage of total memory in the in-memory Raft log before a snapshot
+     * is to be taken
+     *
+     * @return int
+     */
+    int getSnapshotDataThresholdPercentage();
+
     /**
      * The interval at which a heart beat message will be sent to the remote
      * RaftActor
index a2092234d54134fbe3de765d17041ad766d689c4..3a6bdbf0a3ec75151313c6b9a00143ea1d15cbd0 100644 (file)
@@ -7,9 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Default implementation of the ConfigParams
@@ -47,6 +46,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
     private FiniteDuration isolatedLeaderCheckInterval =
         new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
 
+    // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+    // in-memory journal can use before it needs to snapshot
+    private int snapshotDataThresholdPercentage = 12;
+
     public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
         this.heartBeatInterval = heartBeatInterval;
     }
@@ -55,6 +58,10 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         this.snapshotBatchCount = snapshotBatchCount;
     }
 
+    public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+        this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+    }
+
     public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
         this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
     }
@@ -68,6 +75,12 @@ public class DefaultConfigParamsImpl implements ConfigParams {
         return snapshotBatchCount;
     }
 
+    @Override
+    public int getSnapshotDataThresholdPercentage() {
+        return snapshotDataThresholdPercentage;
+    }
+
+
     @Override
     public FiniteDuration getHeartBeatInterval() {
         return heartBeatInterval;
index d647475e4d53319e1638c0e4c03fe4dfcd58a891..3b8469207798952298a6f304a2b07e18153e4ea2 100644 (file)
@@ -676,6 +676,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 @Override public void apply(DeleteEntries param)
                     throws Exception {
                     //FIXME : Doing nothing for now
+                    dataSize = 0;
+                    for(ReplicatedLogEntry entry : journal){
+                        dataSize += entry.size();
+                    }
                 }
             });
         }
@@ -685,6 +689,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             appendAndPersist(null, null, replicatedLogEntry);
         }
 
+        @Override
+        public int dataSize() {
+            return dataSize;
+        }
+
         public void appendAndPersist(final ActorRef clientActor,
             final String identifier,
             final ReplicatedLogEntry replicatedLogEntry) {
@@ -705,9 +714,15 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 new Procedure<ReplicatedLogEntry>() {
                     @Override
                     public void apply(ReplicatedLogEntry evt) throws Exception {
+                        dataSize += replicatedLogEntry.size();
+
+                        long dataThreshold = Runtime.getRuntime().totalMemory() *
+                                getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
                         // when a snaphsot is being taken, captureSnapshot != null
                         if (hasSnapshotCaptureInitiated == false &&
-                            journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+                                ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+                                        dataSize > dataThreshold)) {
 
                             LOG.info("Initiating Snapshot Capture..");
                             long lastAppliedIndex = -1;
index 7ee85322a60f9ec93a4effc6e9379bd52d77a6ed..80b7ad90d05dbb1e9fe45ec57a65a44dedfe4463 100644 (file)
@@ -171,4 +171,9 @@ public interface ReplicatedLog {
      * Restores the replicated log to a state in the event of a save snapshot failure
      */
     public void snapshotRollback();
+
+    /**
+     * Size of the data in the log (in bytes)
+     */
+    public int dataSize();
 }
index f501c4d37f1f864c7a0551c119403d2b76457d1d..19796097364f3a2d321b11edb281834055f97a16 100644 (file)
@@ -34,4 +34,13 @@ public interface ReplicatedLogEntry {
      * @return
      */
     long getIndex();
+
+    /**
+     * The size of the entry in bytes.
+     *
+     * An approximate number may be good enough.
+     *
+     * @return
+     */
+    int size();
 }
index ceb5633d4d2deaedb8029a4eb9e7ee3864f68035..986c7f382c44f00405abb7d62394cda548e2a66c 100644 (file)
@@ -8,9 +8,8 @@
 
 package org.opendaylight.controller.cluster.raft;
 
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
 import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
     Serializable {
@@ -39,6 +38,11 @@ public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
         return index;
     }
 
+    @Override
+    public int size() {
+        return getData().size();
+    }
+
     @Override public String toString() {
         return "Entry{" +
             "index=" + index +
index 398a2e9b3651de7846eb36830b5b3410f71a254f..d95c9d502712159334171301868ee9af4cfa99d7 100644 (file)
@@ -164,6 +164,11 @@ public class AbstractReplicatedLogImplTest {
             this.snapshotTerm = snapshotTerm;
         }
 
+        @Override
+        public int dataSize() {
+            return -1;
+        }
+
         public List<ReplicatedLogEntry> getEntriesTill(final int index) {
             return journal.subList(0, index);
         }
index 562ca213a950123d876668fca5c47da4ea031995..2424d4d557f7f1dc58e65a1718e44a6a84db501f 100644 (file)
@@ -16,13 +16,12 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
 
 public class MockRaftActorContext implements RaftActorContext {
 
@@ -192,6 +191,11 @@ public class MockRaftActorContext implements RaftActorContext {
             append(replicatedLogEntry);
         }
 
+        @Override
+        public int dataSize() {
+            return -1;
+        }
+
         @Override public void removeFromAndPersist(long index) {
             removeFrom(index);
         }
@@ -222,6 +226,11 @@ public class MockRaftActorContext implements RaftActorContext {
             return this;
         }
 
+        @Override
+        public int size() {
+            return value.length();
+        }
+
         @Override public String getClientPayloadClassName() {
             return MockPayload.class.getName();
         }
@@ -256,6 +265,11 @@ public class MockRaftActorContext implements RaftActorContext {
         @Override public long getIndex() {
             return index;
         }
+
+        @Override
+        public int size() {
+            return getData().size();
+        }
     }
 
     public static class MockReplicatedLogBuilder {
index aadc3625cc34abf8c58dd0143bc97d4c571f7e6f..075c6075a86b327775d23f4c217794700b69611f 100644 (file)
@@ -12,12 +12,11 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.UnknownFieldSet;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 
 public class CompositeModificationPayload extends Payload implements
     Serializable {
@@ -73,4 +72,8 @@ public class CompositeModificationPayload extends Payload implements
     public Object getModification(){
         return this.modification;
     }
+
+    public int size(){
+        return this.modification.getSerializedSize();
+    }
 }
index 502c338f2298ea34b443d0b0f1e8d3a4d36df721..7df53082948d0c69f4ea2c1223025821bd5946bd 100644 (file)
@@ -10,9 +10,8 @@ package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
 
 
 import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
 import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 
 /**
  * An instance of a Payload class is meant to be used as the Payload for
@@ -81,6 +80,8 @@ public abstract class Payload {
     public abstract Payload decode(
         AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
 
+    public abstract int size();
+
 
 
 }
index e18c00ec4b0b5449ddc071175b379372528796f4..daba3fdf8ac18ac889c348b1af017c679603ebb3 100644 (file)
@@ -8,17 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
 import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-import akka.util.Timeout;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
 /**
  * Contains contextual data for a data store.
  *
@@ -120,6 +119,7 @@ public class DatastoreContext {
         private boolean persistent = true;
         private ConfigurationReader configurationReader = new FileConfigurationReader();
         private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
+        private int shardSnapshotDataThresholdPercentage = 12;
 
         public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
             this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
@@ -156,6 +156,12 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+            this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+            return this;
+        }
+
+
         public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
             this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
             return this;
@@ -191,12 +197,14 @@ public class DatastoreContext {
             return this;
         }
 
+
         public DatastoreContext build() {
             DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
             raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
                     TimeUnit.MILLISECONDS));
             raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
             raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+            raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
             raftConfig.setIsolatedLeaderCheckInterval(
                 new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
 
index 7073ea758b2b751d93708914bfc3935779076908..af16d02eea2909ee75e83e2b62dc58bc633b7031 100644 (file)
@@ -788,6 +788,7 @@ public class Shard extends RaftActor {
 
         shardMBean.setCommitIndex(getCommitIndex());
         shardMBean.setLastApplied(getLastApplied());
+        shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
     }
 
     @Override
index 9decd82822a2533a092c39b3708b15e21a3be063..4fc2ed2d0691c400ce04eb04482e0794bfef217b 100644 (file)
@@ -62,6 +62,8 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
 
     private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
 
+    private volatile long dataSize = 0;
+
     private final SimpleDateFormat sdf =
         new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 
@@ -218,6 +220,15 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
         this.lastCommittedTransactionTime = lastCommittedTransactionTime;
     }
 
+    public void setDataSize(long dataSize){
+        this.dataSize = dataSize;
+    }
+
+    @Override
+    public long getDataSize(){
+        return dataSize;
+    }
+
     @Override
     public ThreadExecutorStats getDataStoreExecutorStats() {
         // FIXME: this particular thing does not work, as it really is DS-specific
index 4d3d438b3267b857457e4f7a5c767c52a9338c6a..367d4f45e235209dd3a07b51107dec6198d1c157 100644 (file)
@@ -54,6 +54,12 @@ module distributed-datastore-provider {
         }
     }
 
+    typedef percentage {
+        type uint8 {
+            range "0..100";
+        }
+    }
+
     grouping data-store-properties {
         leaf max-shard-data-change-executor-queue-size {
             default 1000;
@@ -88,9 +94,16 @@ module distributed-datastore-provider {
          leaf shard-snapshot-batch-count {
             default 20000;
             type non-zero-uint32-type;
-            description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+            description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
          }
 
+         leaf shard-snapshot-data-threshold-percentage {
+            default 12;
+            type percentage;
+            description "The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken";
+         }
+
+
          leaf shard-hearbeat-interval-in-millis {
             default 500;
             type heartbeat-interval-type;
index 04d889fbe0f61eaa822b9b286c9527ad1b1e5447..9e02223f54e18d4d844f8b2be9af188df261a901 100644 (file)
@@ -1,5 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -13,13 +19,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 public class CompositeModificationPayloadTest {
 
 
@@ -60,6 +59,11 @@ public class CompositeModificationPayloadTest {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         AppendEntries appendEntries =
index a2b78c6c152dd350b7a4f8bffc075af0ea97cd92..a3041e89dbf9ac2a9139552c6cafff7437bc0390 100644 (file)
@@ -13,19 +13,18 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import java.util.ArrayList;
+import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class Client {
 
     private static ActorSystem actorSystem;
@@ -93,6 +92,11 @@ public class Client {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         return new AppendEntries(1, "member-1", 0, 100, modification, 1);
@@ -113,6 +117,11 @@ public class Client {
             @Override public long getIndex() {
                 return 1;
             }
+
+            @Override
+            public int size() {
+                return getData().size();
+            }
         });
 
         return new AppendEntries(1, "member-1", 0, 100, modification, 1);