<artifactId>commons-io</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${scala.version}</artifactId>
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.time.DurationFormatUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
} else if (message instanceof CaptureSnapshotReply){
handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
-
+ } else if(message instanceof GetOnDemandRaftState) {
+ onGetOnDemandRaftStats();
} else {
RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = currentBehavior.handleMessage(getSender(), message);
}
}
+ private void onGetOnDemandRaftStats() {
+ // Debugging message to retrieve raft stats.
+
+ OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
+ .commitIndex(context.getCommitIndex())
+ .currentTerm(context.getTermInformation().getCurrentTerm())
+ .inMemoryJournalDataSize(replicatedLog.dataSize())
+ .inMemoryJournalLogSize(replicatedLog.size())
+ .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated())
+ .lastApplied(context.getLastApplied())
+ .lastIndex(replicatedLog.lastIndex())
+ .lastTerm(replicatedLog.lastTerm())
+ .leader(getLeaderId())
+ .raftState(currentBehavior.state().toString())
+ .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex())
+ .snapshotIndex(replicatedLog.getSnapshotIndex())
+ .snapshotTerm(replicatedLog.getSnapshotTerm())
+ .votedFor(context.getTermInformation().getVotedFor())
+ .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses()));
+
+ ReplicatedLogEntry lastLogEntry = getLastLogEntry();
+ if (lastLogEntry != null) {
+ builder.lastLogIndex(lastLogEntry.getIndex());
+ builder.lastLogTerm(lastLogEntry.getTerm());
+ }
+
+ if(currentBehavior instanceof AbstractLeader) {
+ AbstractLeader leader = (AbstractLeader)currentBehavior;
+ Collection<String> followerIds = leader.getFollowerIds();
+ List<FollowerInfo> followerInfoList = Lists.newArrayListWithCapacity(followerIds.size());
+ for(String id: followerIds) {
+ final FollowerLogInformation info = leader.getFollower(id);
+ followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(),
+ info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity())));
+ }
+
+ builder.followerInfoList(followerInfoList);
+ }
+
+ sender().tell(builder.build(), self());
+
+ }
+
private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
if (oldBehavior != currentBehavior){
onStateChanged();
*
* @return Collection of follower IDs
*/
- protected final Collection<String> getFollowerIds() {
+ public final Collection<String> getFollowerIds() {
return followerToLog.keySet();
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * A bean class containing a snapshot of information for a follower returned from GetOnDemandRaftStats.
+ *
+ * @author Thomas Pantelis
+ */
+public class FollowerInfo {
+ private final String id;
+ private final long nextIndex;
+ private final long matchIndex;
+ private final boolean isActive;
+ private final String timeSinceLastActivity;
+
+ @ConstructorProperties({"id","nextIndex", "matchIndex", "isActive", "timeSinceLastActivity"})
+ public FollowerInfo(String id, long nextIndex, long matchIndex, boolean isActive, String timeSinceLastActivity) {
+ this.id = id;
+ this.nextIndex = nextIndex;
+ this.matchIndex = matchIndex;
+ this.isActive = isActive;
+ this.timeSinceLastActivity = timeSinceLastActivity;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public long getNextIndex() {
+ return nextIndex;
+ }
+
+ public long getMatchIndex() {
+ return matchIndex;
+ }
+
+ public boolean isActive() {
+ return isActive;
+ }
+
+ public String getTimeSinceLastActivity() {
+ return timeSinceLastActivity;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+/**
+ * Local message sent to a RaftActor to obtain a snapshot of statistical information. Returns an
+ * OnDemandRaftState instance.
+ *
+ * @author Thomas Pantelis
+ */
+public class GetOnDemandRaftState {
+ public static final GetOnDemandRaftState INSTANCE = new GetOnDemandRaftState();
+
+ private GetOnDemandRaftState() {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft.client.messages;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The response to a GetOnDemandRaftState message,
+ *
+ * @author Thomas Pantelis
+ */
+public class OnDemandRaftState {
+ private long lastLogIndex = -1L;
+ private long lastLogTerm = -1L;
+ private long currentTerm = -1L;
+ private long commitIndex = -1L;
+ private long lastApplied = -1L;
+ private long lastIndex = -1L;
+ private long lastTerm = -1L;
+ private long snapshotIndex = -1L;
+ private long snapshotTerm = -1L;
+ private long replicatedToAllIndex = -1L;
+ private long inMemoryJournalDataSize;
+ private long inMemoryJournalLogSize;
+ private String leader;
+ private String raftState;
+ private String votedFor;
+ private boolean isSnapshotCaptureInitiated;
+
+ private List<FollowerInfo> followerInfoList = Collections.emptyList();
+ private Map<String, String> peerAddresses = Collections.emptyMap();
+
+ private OnDemandRaftState() {
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public long getLastLogIndex() {
+ return lastLogIndex;
+ }
+
+ public long getLastLogTerm() {
+ return lastLogTerm;
+ }
+
+ public long getCurrentTerm() {
+ return currentTerm;
+ }
+
+ public long getCommitIndex() {
+ return commitIndex;
+ }
+
+ public long getLastApplied() {
+ return lastApplied;
+ }
+
+ public long getLastIndex() {
+ return lastIndex;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ public long getReplicatedToAllIndex() {
+ return replicatedToAllIndex;
+ }
+
+ public long getInMemoryJournalDataSize() {
+ return inMemoryJournalDataSize;
+ }
+
+ public long getInMemoryJournalLogSize() {
+ return inMemoryJournalLogSize;
+ }
+
+ public String getLeader() {
+ return leader;
+ }
+
+ public String getRaftState() {
+ return raftState;
+ }
+
+ public String getVotedFor() {
+ return votedFor;
+ }
+
+ public boolean isSnapshotCaptureInitiated() {
+ return isSnapshotCaptureInitiated;
+ }
+
+ public List<FollowerInfo> getFollowerInfoList() {
+ return followerInfoList;
+ }
+
+ public Map<String, String> getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public static class Builder {
+ private final OnDemandRaftState stats = new OnDemandRaftState();
+
+ public Builder lastLogIndex(long value) {
+ stats.lastLogIndex = value;
+ return this;
+ }
+
+ public Builder lastLogTerm(long value) {
+ stats.lastLogTerm = value;
+ return this;
+ }
+
+ public Builder currentTerm(long value) {
+ stats.currentTerm = value;
+ return this;
+ }
+
+ public Builder commitIndex(long value) {
+ stats.commitIndex = value;
+ return this;
+ }
+
+ public Builder lastApplied(long value) {
+ stats.lastApplied = value;
+ return this;
+ }
+
+ public Builder lastIndex(long value) {
+ stats.lastIndex = value;
+ return this;
+ }
+
+ public Builder lastTerm(long value) {
+ stats.lastTerm = value;
+ return this;
+ }
+
+ public Builder snapshotIndex(long value) {
+ stats.snapshotIndex = value;
+ return this;
+ }
+
+ public Builder snapshotTerm(long value) {
+ stats.snapshotTerm = value;
+ return this;
+ }
+
+ public Builder replicatedToAllIndex(long value) {
+ stats.replicatedToAllIndex = value;
+ return this;
+ }
+
+ public Builder inMemoryJournalDataSize(long value) {
+ stats.inMemoryJournalDataSize = value;
+ return this;
+ }
+
+ public Builder inMemoryJournalLogSize(long value) {
+ stats.inMemoryJournalLogSize = value;
+ return this;
+ }
+
+ public Builder leader(String value) {
+ stats.leader = value;
+ return this;
+ }
+
+ public Builder raftState(String value) {
+ stats.raftState = value;
+ return this;
+ }
+
+ public Builder votedFor(String value) {
+ stats.votedFor = value;
+ return this;
+ }
+
+ public Builder followerInfoList(List<FollowerInfo> followerInfoList) {
+ stats.followerInfoList = followerInfoList;
+ return this;
+ }
+
+ public Builder peerAddresses(Map<String, String> peerAddresses) {
+ stats.peerAddresses = peerAddresses;
+ return this;
+ }
+
+ public Builder isSnapshotCaptureInitiated(boolean value) {
+ stats.isSnapshotCaptureInitiated = value;
+ return this;
+ }
+
+ public OnDemandRaftState build() {
+ return stats;
+ }
+ }
+}
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+ shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
recoveryCoordinator = null;
currentLogRecoveryBatch = null;
- updateJournalStats();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
-
- updateJournalStats();
-
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
}
}
- private void updateJournalStats() {
- ReplicatedLogEntry lastLogEntry = getLastLogEntry();
-
- if (lastLogEntry != null) {
- shardMBean.setLastLogIndex(lastLogEntry.getIndex());
- shardMBean.setLastLogTerm(lastLogEntry.getTerm());
- }
-
- shardMBean.setCommitIndex(getCommitIndex());
- shardMBean.setLastApplied(getLastApplied());
- shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
- }
-
@Override
protected void createSnapshot() {
// Create a transaction actor. We are really going to treat the transaction as a worker
delayedListenerRegistrations.clear();
}
- shardMBean.setRaftState(getRaftState().name());
- shardMBean.setCurrentTerm(getCurrentTerm());
-
// If this actor is no longer the leader close all the transaction chains
if(!isLeader){
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
return dataPersistenceProvider;
}
- @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
- shardMBean.setLeader(newLeader);
- }
-
@Override public String persistenceId() {
return this.name.toString();
}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
/**
* Maintains statistics for a shard.
public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
public static String JMX_CATEGORY_SHARD = "Shards";
+ private static final Logger LOG = LoggerFactory.getLogger(ShardStats.class);
+
+ private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+
+ private static final Cache<String, OnDemandRaftState> onDemandRaftStateCache =
+ CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build();
+
private long committedTransactionsCount;
private long readOnlyTransactionCount;
private long readWriteTransactionCount;
- private String leader;
-
- private String raftState;
-
- private long lastLogTerm = -1L;
-
- private long lastLogIndex = -1L;
-
- private long currentTerm = -1L;
-
- private long commitIndex = -1L;
-
- private long lastApplied = -1L;
-
private long lastCommittedTransactionTime;
private long failedTransactionsCount;
private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
- private long dataSize = 0;
+ private boolean followerInitialSyncStatus = false;
- private final SimpleDateFormat sdf =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ private ActorRef shardActor;
- private boolean followerInitialSyncStatus = false;
+ private String statRetrievalError;
+
+ private String statRetrievalTime;
public ShardStats(final String shardName, final String mxBeanType) {
super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor());
}
+ public void setShardActor(ActorRef shardActor) {
+ this.shardActor = shardActor;
+ }
+
+ private OnDemandRaftState getOnDemandRaftState() {
+ String name = getShardName();
+ OnDemandRaftState state = onDemandRaftStateCache.getIfPresent(name);
+ if(state == null) {
+ statRetrievalError = null;
+ statRetrievalTime = null;
+
+ if(shardActor != null) {
+ Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+ try {
+ Stopwatch timer = Stopwatch.createStarted();
+
+ state = (OnDemandRaftState) Await.result(Patterns.ask(shardActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+
+ statRetrievalTime = timer.stop().toString();
+ onDemandRaftStateCache.put(name, state);
+ } catch (Exception e) {
+ statRetrievalError = e.toString();
+ }
+ }
+
+ state = state != null ? state : OnDemandRaftState.builder().build();
+ }
+
+ return state;
+ }
+
@Override
public String getShardName() {
return getMBeanName();
@Override
public String getLeader() {
- return leader;
+ return getOnDemandRaftState().getLeader();
}
@Override
public String getRaftState() {
- return raftState;
+ return getOnDemandRaftState().getRaftState();
}
@Override
@Override
public long getLastLogIndex() {
- return lastLogIndex;
+ return getOnDemandRaftState().getLastLogIndex();
}
@Override
public long getLastLogTerm() {
- return lastLogTerm;
+ return getOnDemandRaftState().getLastLogTerm();
}
@Override
public long getCurrentTerm() {
- return currentTerm;
+ return getOnDemandRaftState().getCurrentTerm();
}
@Override
public long getCommitIndex() {
- return commitIndex;
+ return getOnDemandRaftState().getCommitIndex();
}
@Override
public long getLastApplied() {
- return lastApplied;
+ return getOnDemandRaftState().getLastApplied();
}
@Override
- public String getLastCommittedTransactionTime() {
+ public long getLastIndex() {
+ return getOnDemandRaftState().getLastIndex();
+ }
- return sdf.format(new Date(lastCommittedTransactionTime));
+ @Override
+ public long getLastTerm() {
+ return getOnDemandRaftState().getLastTerm();
+ }
+
+ @Override
+ public long getSnapshotIndex() {
+ return getOnDemandRaftState().getSnapshotIndex();
+ }
+
+ @Override
+ public long getSnapshotTerm() {
+ return getOnDemandRaftState().getSnapshotTerm();
+ }
+
+ @Override
+ public long getReplicatedToAllIndex() {
+ return getOnDemandRaftState().getReplicatedToAllIndex();
+ }
+
+ @Override
+ public String getVotedFor() {
+ return getOnDemandRaftState().getVotedFor();
+ }
+
+ @Override
+ public boolean isSnapshotCaptureInitiated() {
+ return getOnDemandRaftState().isSnapshotCaptureInitiated();
+ }
+
+ @Override
+ public String getLastCommittedTransactionTime() {
+ return DATE_FORMAT.format(new Date(lastCommittedTransactionTime));
}
@Override
return ++abortTransactionsCount;
}
- public void setLeader(final String leader) {
- this.leader = leader;
- }
-
- public void setRaftState(final String raftState) {
- this.raftState = raftState;
- }
-
- public void setLastLogTerm(final long lastLogTerm) {
- this.lastLogTerm = lastLogTerm;
- }
-
- public void setLastLogIndex(final long lastLogIndex) {
- this.lastLogIndex = lastLogIndex;
- }
-
- public void setCurrentTerm(final long currentTerm) {
- this.currentTerm = currentTerm;
- }
-
- public void setCommitIndex(final long commitIndex) {
- this.commitIndex = commitIndex;
- }
-
- public void setLastApplied(final long lastApplied) {
- this.lastApplied = lastApplied;
- }
-
public void setLastCommittedTransactionTime(final long lastCommittedTransactionTime) {
this.lastCommittedTransactionTime = lastCommittedTransactionTime;
}
- public void setInMemoryJournalDataSize(long dataSize){
- this.dataSize = dataSize;
+ @Override
+ public long getInMemoryJournalDataSize(){
+ return getOnDemandRaftState().getInMemoryJournalDataSize();
}
@Override
- public long getInMemoryJournalDataSize(){
- return dataSize;
+ public long getInMemoryJournalLogSize() {
+ return getOnDemandRaftState().getInMemoryJournalLogSize();
}
@Override
public boolean getFollowerInitialSyncStatus() {
return followerInitialSyncStatus;
}
+
+ @Override
+ public List<FollowerInfo> getFollowerInfo() {
+ return getOnDemandRaftState().getFollowerInfoList();
+ }
+
+ @Override
+ public String getPeerAddresses() {
+ StringBuilder builder = new StringBuilder();
+ int i = 0;
+ for(Map.Entry<String, String> e: getOnDemandRaftState().getPeerAddresses().entrySet()) {
+ if(i++ > 0) {
+ builder.append(", ");
+ }
+
+ builder.append(e.getKey()).append(": ").append(e.getValue());
+ }
+
+ return builder.toString();
+ }
+
+ @Override
+ public String getStatRetrievalTime() {
+ getOnDemandRaftState();
+ return statRetrievalTime;
+ }
+
+ @Override
+ public String getStatRetrievalError() {
+ getOnDemandRaftState();
+ return statRetrievalError;
+ }
}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
import java.util.List;
-
+import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
String getShardName();
+ String getStatRetrievalTime();
+
+ String getStatRetrievalError();
+
long getCommittedTransactionsCount();
long getReadOnlyTransactionCount();
long getLastApplied();
+ long getLastIndex();
+
+ long getLastTerm();
+
+ long getSnapshotIndex();
+
+ long getSnapshotTerm();
+
+ long getReplicatedToAllIndex();
+
String getLastCommittedTransactionTime();
long getFailedTransactionsCount();
String getRaftState();
+ String getVotedFor();
+
+ boolean isSnapshotCaptureInitiated();
+
ThreadExecutorStats getDataStoreExecutorStats();
ThreadExecutorStats getNotificationMgrExecutorStats();
long getInMemoryJournalDataSize();
+ long getInMemoryJournalLogSize();
+
boolean getFollowerInitialSyncStatus();
+
+ List<FollowerInfo> getFollowerInfo();
+
+ String getPeerAddresses();
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- protected void verifyLastLogIndex(TestActorRef<Shard> shard, long expectedValue) {
+ protected void verifyLastApplied(TestActorRef<Shard> shard, long expectedValue) {
+ long lastApplied = -1;
for(int i = 0; i < 20 * 5; i++) {
- long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex();
- if(lastLogIndex == expectedValue) {
- break;
+ lastApplied = shard.underlyingActor().getShardMBean().getLastApplied();
+ if(lastApplied == expectedValue) {
+ return;
}
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex());
+ Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
}
protected NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
- verifyLastLogIndex(shard, 2);
+ verifyLastApplied(shard, 2);
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
new ShardTestKit(getSystem()) {{
final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
- "testConcurrentThreePhaseCommits");
+ "testPreLithiumConcurrentThreePhaseCommits");
waitUntilLeader(shard);
assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
- verifyLastLogIndex(shard, 2);
+ verifyLastApplied(shard, 2);
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};