package org.opendaylight.controller.cluster.example.messages;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.example.KeyValueMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
public class KeyValue extends Payload implements Serializable {
private static final long serialVersionUID = 1L;
return this;
}
+ @Override
+ public int size() {
+ return this.value.length() + this.key.length();
+ }
+
}
protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
protected long previousSnapshotIndex = -1;
protected long previousSnapshotTerm = -1;
+ protected int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
long snapshotTerm, List<ReplicatedLogEntry> unAppliedEntries) {
snapshottedJournal = null;
previousSnapshotIndex = -1;
previousSnapshotTerm = -1;
+ dataSize = 0;
}
@Override
*/
long getSnapshotBatchCount();
+ /**
+ * The percentage of total memory in the in-memory Raft log before a snapshot
+ * is to be taken
+ *
+ * @return int
+ */
+ int getSnapshotDataThresholdPercentage();
+
/**
* The interval at which a heart beat message will be sent to the remote
* RaftActor
*/
package org.opendaylight.controller.cluster.raft;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
+import scala.concurrent.duration.FiniteDuration;
/**
* Default implementation of the ConfigParams
private FiniteDuration isolatedLeaderCheckInterval =
new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000, HEART_BEAT_INTERVAL.unit());
+ // 12 is just an arbitrary percentage. This is the amount of the total memory that a raft actor's
+ // in-memory journal can use before it needs to snapshot
+ private int snapshotDataThresholdPercentage = 12;
+
public void setHeartBeatInterval(FiniteDuration heartBeatInterval) {
this.heartBeatInterval = heartBeatInterval;
}
this.snapshotBatchCount = snapshotBatchCount;
}
+ public void setSnapshotDataThresholdPercentage(int snapshotDataThresholdPercentage){
+ this.snapshotDataThresholdPercentage = snapshotDataThresholdPercentage;
+ }
+
public void setJournalRecoveryLogBatchSize(int journalRecoveryLogBatchSize) {
this.journalRecoveryLogBatchSize = journalRecoveryLogBatchSize;
}
return snapshotBatchCount;
}
+ @Override
+ public int getSnapshotDataThresholdPercentage() {
+ return snapshotDataThresholdPercentage;
+ }
+
+
@Override
public FiniteDuration getHeartBeatInterval() {
return heartBeatInterval;
@Override public void apply(DeleteEntries param)
throws Exception {
//FIXME : Doing nothing for now
+ dataSize = 0;
+ for(ReplicatedLogEntry entry : journal){
+ dataSize += entry.size();
+ }
}
});
}
appendAndPersist(null, null, replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
public void appendAndPersist(final ActorRef clientActor,
final String identifier,
final ReplicatedLogEntry replicatedLogEntry) {
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
+ dataSize += replicatedLogEntry.size();
+
+ long dataThreshold = Runtime.getRuntime().totalMemory() *
+ getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
// when a snaphsot is being taken, captureSnapshot != null
if (hasSnapshotCaptureInitiated == false &&
- journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+ ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+ dataSize > dataThreshold)) {
LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
* Restores the replicated log to a state in the event of a save snapshot failure
*/
public void snapshotRollback();
+
+ /**
+ * Size of the data in the log (in bytes)
+ */
+ public int dataSize();
}
* @return
*/
long getIndex();
+
+ /**
+ * The size of the entry in bytes.
+ *
+ * An approximate number may be good enough.
+ *
+ * @return
+ */
+ int size();
}
package org.opendaylight.controller.cluster.raft;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-
import java.io.Serializable;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class ReplicatedLogImplEntry implements ReplicatedLogEntry,
Serializable {
return index;
}
+ @Override
+ public int size() {
+ return getData().size();
+ }
+
@Override public String toString() {
return "Entry{" +
"index=" + index +
this.snapshotTerm = snapshotTerm;
}
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
public List<ReplicatedLogEntry> getEntriesTill(final int index) {
return journal.subList(0, index);
}
import akka.event.LoggingAdapter;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
public class MockRaftActorContext implements RaftActorContext {
append(replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return -1;
+ }
+
@Override public void removeFromAndPersist(long index) {
removeFrom(index);
}
return this;
}
+ @Override
+ public int size() {
+ return value.length();
+ }
+
@Override public String getClientPayloadClassName() {
return MockPayload.class.getName();
}
@Override public long getIndex() {
return index;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
}
public static class MockReplicatedLogBuilder {
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UnknownFieldSet;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
public class CompositeModificationPayload extends Payload implements
Serializable {
public Object getModification(){
return this.modification;
}
+
+ public int size(){
+ return this.modification.getSerializedSize();
+ }
}
import com.google.protobuf.GeneratedMessage;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-
import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
/**
* An instance of a Payload class is meant to be used as the Payload for
public abstract Payload decode(
AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload);
+ public abstract int size();
+
}
package org.opendaylight.controller.cluster.datastore;
+import akka.util.Timeout;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationReader;
import org.opendaylight.controller.cluster.datastore.config.FileConfigurationReader;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
-import akka.util.Timeout;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
/**
* Contains contextual data for a data store.
*
private boolean persistent = true;
private ConfigurationReader configurationReader = new FileConfigurationReader();
private int shardIsolatedLeaderCheckIntervalInMillis = shardHeartbeatIntervalInMillis * 10;
+ private int shardSnapshotDataThresholdPercentage = 12;
public Builder shardTransactionIdleTimeout(Duration shardTransactionIdleTimeout) {
this.shardTransactionIdleTimeout = shardTransactionIdleTimeout;
return this;
}
+ public Builder shardSnapshotDataThresholdPercentage(int shardSnapshotDataThresholdPercentage) {
+ this.shardSnapshotDataThresholdPercentage = shardSnapshotDataThresholdPercentage;
+ return this;
+ }
+
+
public Builder shardHeartbeatIntervalInMillis(int shardHeartbeatIntervalInMillis) {
this.shardHeartbeatIntervalInMillis = shardHeartbeatIntervalInMillis;
return this;
return this;
}
+
public DatastoreContext build() {
DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
raftConfig.setHeartBeatInterval(new FiniteDuration(shardHeartbeatIntervalInMillis,
TimeUnit.MILLISECONDS));
raftConfig.setJournalRecoveryLogBatchSize(shardJournalRecoveryLogBatchSize);
raftConfig.setSnapshotBatchCount(shardSnapshotBatchCount);
+ raftConfig.setSnapshotDataThresholdPercentage(shardSnapshotDataThresholdPercentage);
raftConfig.setIsolatedLeaderCheckInterval(
new FiniteDuration(shardIsolatedLeaderCheckIntervalInMillis, TimeUnit.MILLISECONDS));
shardMBean.setCommitIndex(getCommitIndex());
shardMBean.setLastApplied(getLastApplied());
+ shardMBean.setDataSize(getRaftActorContext().getReplicatedLog().dataSize());
}
@Override
private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
+ private volatile long dataSize = 0;
+
private final SimpleDateFormat sdf =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
+ public void setDataSize(long dataSize){
+ this.dataSize = dataSize;
+ }
+
+ @Override
+ public long getDataSize(){
+ return dataSize;
+ }
+
@Override
public ThreadExecutorStats getDataStoreExecutorStats() {
// FIXME: this particular thing does not work, as it really is DS-specific
int getMaxNotificationMgrListenerQueueSize();
void resetTransactionCounters();
+
+ long getDataSize();
}
}
}
+ typedef percentage {
+ type uint8 {
+ range "0..100";
+ }
+ }
+
grouping data-store-properties {
leaf max-shard-data-change-executor-queue-size {
default 1000;
leaf shard-snapshot-batch-count {
default 20000;
type non-zero-uint32-type;
- description "The minimum number of entries to be present in the in-memory journal log before a snapshot to be taken.";
+ description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
}
+ leaf shard-snapshot-data-threshold-percentage {
+ default 12;
+ type percentage;
+ description "The percentage of Runtime.totalMemory() used by the in-memory journal log before a snapshot is to be taken";
+ }
+
+
leaf shard-hearbeat-interval-in-millis {
default 500;
type heartbeat-interval-type;
package org.opendaylight.controller.cluster.datastore;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
public class CompositeModificationPayloadTest {
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
AppendEntries appendEntries =
import akka.actor.Props;
import akka.actor.UntypedActor;
import com.typesafe.config.ConfigFactory;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import java.util.ArrayList;
+import java.util.List;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import java.util.ArrayList;
-import java.util.List;
-
public class Client {
private static ActorSystem actorSystem;
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
return new AppendEntries(1, "member-1", 0, 100, modification, 1);
@Override public long getIndex() {
return 1;
}
+
+ @Override
+ public int size() {
+ return getData().size();
+ }
});
return new AppendEntries(1, "member-1", 0, 100, modification, 1);