- Changed RaftActor to snapshot based on the transaction count or the size of the
data in the in-memory journal whichever comes earlier
- The size of data that is used is the rough (not-accurate) size of the Payload
in the Replicated log entry
- In ShardStats exposed another property which is the data size of the in-memory
journal
- The snapshot data threshold percentage is configurable using the config sub-system and is
set to a default of 12%. The reason for setting it at 12% by default is because we have
a total of 8 default shards out of the box. I could have set this to 16% as toaster is not
a "real" data shard.
- The snapshot data threshold is calculated as a percentage of the Runtime.totalMemory()
which is the total memory the jvm considers available for object allocation. From testing
it appears that the total memory is what would appear in jconsole as the committed memory.
I have not added any unit testing for this - but tested this using the scenario described in
bug 2437 and it seems to work pretty well. The deployment used only 2G of memory and worked
fine for a 7 switch topology and I observed that it had not run out of memory after more than
2 hours.
Change-Id: I09ec0827c0411c42a9224bb6d159d5590c22e20b
Signed-off-by: Moiz Raja <moraja@cisco.com>
package org.opendaylight.controller.cluster.example.messages;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
public class KeyValue extends Payload implements Serializable {
private static final long serialVersionUID = 1L;
return this;
}
+ @Override
+ public int size() {
+ return this.value.length() + this.key.length();
+ }
+
}
protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
protected long previousSnapshotIndex = -1;
protected long previousSnapshotTerm = -1;
+ protected int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
snapshottedJournal = null;
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
+ dataSize = 0;
}
@Override
*/
long getSnapshotBatchCount();
+ /**
+ * The percentage of total memory in the in-memory Raft log before a snapshot
+ * is to be taken
+ *
+ * @return int
+ */
+ int getSnapshotDataThresholdPercentage();
+
/**
* The interval at which a heart beat message will be sent to the remote
* RaftActor
*/
package org.opendaylight.controller.cluster.raft;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
/**
* Default implementation of the ConfigParams
private FiniteDuration isolatedLeaderCheckInterval =
new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+ // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+ // in-memory journal can use before it needs to snapshot
+ private int snapshotDataThresholdPercentage = 12;
+
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
}
this.snapshotBatchCount = snapshotBatchCount;
}
+ public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+ this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+ }
+
public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
return snapshotBatchCount;
}
+ @Override
+ public int getSnapshotDataThresholdPercentage() {
+ return snapshotDataThresholdPercentage;
+ }
+
+
@Override
public FiniteDuration getHeartBeatInterval() {
return heartBeatInterval;
@Override public void apply(DeleteEntries param)
throws Exception {
//FIXME : Doing nothing for now
+ dataSize = 0;
+ for(ReplicatedLogEntry entry : journal){
+ dataSize += entry.size();
+ }
}
});
}
appendAndPersist(null, null, replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
public void appendAndPersist(final ActorRef clientActor,
final String identifier,
final ReplicatedLogEntry replicatedLogEntry) {
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
+ dataSize += replicatedLogEntry.size();
+
+ long dataThreshold = Runtime.getRuntime().totalMemory() *
+ getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
// when a snaphsot is being taken, captureSnapshot != null
if (hasSnapshotCaptureInitiated == false &&
- journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+ ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+ dataSize > dataThreshold)) {
LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
* Restores the replicated log to a state in the event of a save snapshot failure
*/
public void snapshotRollback();
+
+ /**
+ * Size of the data in the log (in bytes)
+ */
+ public int dataSize();
}
* @return
*/
long getIndex();
+
+ /**
+ * The size of the entry in bytes.
+ *
+ * An approximate number may be good enough.
+ *
+ * @return
+ */
+ int size();
}
package org.opendaylight.controller.cluster.raft;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
Serializable {
return index;
}
+ @Override
+ public int size() {
+ return getData().size();
+ }
+
@Override public String toString() {
return "Entry{" +
"index=" + index +
this.snapshotTerm = snapshotTerm;
}
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
public List<ReplicatedLogEntry> getEntriesTill(final int index) {
return journal.subList(0, index);
}
import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
public class MockRaftActorContext implements RaftActorContext {
append(replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
@Override public void removeFromAndPersist(long index) {
removeFrom(index);
}
return this;
}
+ @Override
+ public int size() {
+ return value.length();
+ }
+
@Override public String getClientPayloadClassName() {
return MockPayload.class.getName();
}
@Override public long getIndex() {
return index;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
}
public static class MockReplicatedLogBuilder {
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UnknownFieldSet;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
public class CompositeModificationPayload extends Payload implements
Serializable {
public Object getModification(){
return this.modification;
}
+
+ public int size(){
+ return this.modification.getSerializedSize();
+ }
}
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* An instance of a Payload class is meant to be used as the Payload for
public abstract Payload decode(
AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
+ public abstract int size();
+
}
package org.opendaylight.controller.cluster.datastore;
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-import akka.util.Timeout;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
/**
* Contains contextual data for a data store.
*
private boolean persistent = true;
private ConfigurationReader configurationReader = new FileConfigurationReader();
private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
+ private int shardSnapshotDataThresholdPercentage = 12;
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+ return this;
+ }
+
+
public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
return this;
return this;
}
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+ raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
raftConfig.setIsolatedLeaderCheckInterval(
new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
shardMBean.setCommitIndex(getCommitIndex());
shardMBean.setLastApplied(getLastApplied());
+ shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
}
@Override
private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+ private volatile long dataSize = 0;
+
private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
+ public void setDataSize(long dataSize){
+ this.dataSize = dataSize;
+ }
+
+ @Override
+ public long getDataSize(){
+ return dataSize;
+ }
+
@Override
public ThreadExecutorStats getDataStoreExecutorStats() {
// FIXME: this particular thing does not work, as it really is DS-specific
int getMaxNotificationMgrListenerQueueSize();
void resetTransactionCounters();
+
+ long getDataSize();
}
}
}
+ typedef percentage {
+ type uint8 {
+ range "0..100";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
leaf shard-snapshot-batch-count {
default 20000;
type non-zero-uint32-type;
- description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+ description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
}
+ leaf shard-snapshot-data-threshold-percentage {
+ default 12;
+ type percentage;
+ description "The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken";
+ }
+
+
leaf shard-hearbeat-interval-in-millis {
default 500;
type heartbeat-interval-type;
package org.opendaylight.controller.cluster.datastore;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class CompositeModificationPayloadTest {
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
AppendEntries appendEntries =
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import java.util.ArrayList;
+import java.util.List;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import java.util.ArrayList;
-import java.util.List;
-
public class Client {
private static ActorSystem actorSystem;
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
return new AppendEntries(1, "member-1", 0, 100, modification, 1);
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
return new AppendEntries(1, "member-1", 0, 100, modification, 1);