From 79c3fb9269d9baeb3a2787544fca3636e0ea608f Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 13 Mar 2015 07:42:02 -0400 Subject: [PATCH] Add more info to ShardStats JXM bean Added more raft state information to the ShardStats, including lastIndex, lastTerm, snapShotIndex, snapshotTerm, replicatedToAllIndex, info about each follower (if the leader), and peer addresses. Basically all the pertinent raft actor state is now reported. Instead of having the Shard update the stats bean as things change, which would be difficult with some of the state, I changed ShardStats to send a GetOnDemandRaftState message to the shard actor which returns all the state info in an OnDemandRaftState reply. This captures the state in a thread-safe manner. Since each piece of information is returned by the ShardStats bean in separate methods, I didn't want each call to send the message so I cache the last OnDemandRaftState reply for 2 seconds. With this change, the Shard no longer needs to keep the ShardStats up to date with the current state. Change-Id: I1775ba35747c68028f5c3e31789b958d35afa172 Signed-off-by: Tom Pantelis --- opendaylight/md-sal/sal-akka-raft/pom.xml | 5 + .../controller/cluster/raft/RaftActor.java | 55 ++++- .../raft/behaviors/AbstractLeader.java | 2 +- .../raft/client/messages/FollowerInfo.java | 52 +++++ .../client/messages/GetOnDemandRaftState.java | 21 ++ .../client/messages/OnDemandRaftState.java | 216 ++++++++++++++++++ .../controller/cluster/datastore/Shard.java | 26 +-- .../jmx/mbeans/shard/ShardStats.java | 197 +++++++++++----- .../jmx/mbeans/shard/ShardStatsMXBean.java | 26 ++- .../cluster/datastore/AbstractShardTest.java | 12 +- .../cluster/datastore/ShardTest.java | 2 +- .../datastore/compat/PreLithiumShardTest.java | 4 +- 12 files changed, 523 insertions(+), 95 deletions(-) create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java create mode 100644 opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java diff --git a/opendaylight/md-sal/sal-akka-raft/pom.xml b/opendaylight/md-sal/sal-akka-raft/pom.xml index 1f99a52ed5..0ec83c86b3 100644 --- a/opendaylight/md-sal/sal-akka-raft/pom.xml +++ b/opendaylight/md-sal/sal-akka-raft/pom.xml @@ -59,6 +59,11 @@ commons-io + + org.apache.commons + commons-lang3 + + com.typesafe.akka akka-slf4j_${scala.version} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 77ff47d0ad..9faffb9395 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -19,10 +19,15 @@ import akka.persistence.SnapshotSelectionCriteria; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import java.io.Serializable; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.RoleChanged; @@ -34,11 +39,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -384,7 +393,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof CaptureSnapshotReply){ handleCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot()); - + } else if(message instanceof GetOnDemandRaftState) { + onGetOnDemandRaftStats(); } else { RaftActorBehavior oldBehavior = currentBehavior; currentBehavior = currentBehavior.handleMessage(getSender(), message); @@ -393,6 +403,49 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + private void onGetOnDemandRaftStats() { + // Debugging message to retrieve raft stats. + + OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + .commitIndex(context.getCommitIndex()) + .currentTerm(context.getTermInformation().getCurrentTerm()) + .inMemoryJournalDataSize(replicatedLog.dataSize()) + .inMemoryJournalLogSize(replicatedLog.size()) + .isSnapshotCaptureInitiated(context.isSnapshotCaptureInitiated()) + .lastApplied(context.getLastApplied()) + .lastIndex(replicatedLog.lastIndex()) + .lastTerm(replicatedLog.lastTerm()) + .leader(getLeaderId()) + .raftState(currentBehavior.state().toString()) + .replicatedToAllIndex(currentBehavior.getReplicatedToAllIndex()) + .snapshotIndex(replicatedLog.getSnapshotIndex()) + .snapshotTerm(replicatedLog.getSnapshotTerm()) + .votedFor(context.getTermInformation().getVotedFor()) + .peerAddresses(ImmutableMap.copyOf(context.getPeerAddresses())); + + ReplicatedLogEntry lastLogEntry = getLastLogEntry(); + if (lastLogEntry != null) { + builder.lastLogIndex(lastLogEntry.getIndex()); + builder.lastLogTerm(lastLogEntry.getTerm()); + } + + if(currentBehavior instanceof AbstractLeader) { + AbstractLeader leader = (AbstractLeader)currentBehavior; + Collection followerIds = leader.getFollowerIds(); + List followerInfoList = Lists.newArrayListWithCapacity(followerIds.size()); + for(String id: followerIds) { + final FollowerLogInformation info = leader.getFollower(id); + followerInfoList.add(new FollowerInfo(id, info.getNextIndex(), info.getMatchIndex(), + info.isFollowerActive(), DurationFormatUtils.formatDurationHMS(info.timeSinceLastActivity()))); + } + + builder.followerInfoList(followerInfoList); + } + + sender().tell(builder.build(), self()); + + } + private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) { if (oldBehavior != currentBehavior){ onStateChanged(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index a4753a4fe4..f46a51ea66 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -134,7 +134,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * * @return Collection of follower IDs */ - protected final Collection getFollowerIds() { + public final Collection getFollowerIds() { return followerToLog.keySet(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java new file mode 100644 index 0000000000..5d2c56a117 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/FollowerInfo.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +import java.beans.ConstructorProperties; + +/** + * A bean class containing a snapshot of information for a follower returned from GetOnDemandRaftStats. + * + * @author Thomas Pantelis + */ +public class FollowerInfo { + private final String id; + private final long nextIndex; + private final long matchIndex; + private final boolean isActive; + private final String timeSinceLastActivity; + + @ConstructorProperties({"id","nextIndex", "matchIndex", "isActive", "timeSinceLastActivity"}) + public FollowerInfo(String id, long nextIndex, long matchIndex, boolean isActive, String timeSinceLastActivity) { + this.id = id; + this.nextIndex = nextIndex; + this.matchIndex = matchIndex; + this.isActive = isActive; + this.timeSinceLastActivity = timeSinceLastActivity; + } + + public String getId() { + return id; + } + + public long getNextIndex() { + return nextIndex; + } + + public long getMatchIndex() { + return matchIndex; + } + + public boolean isActive() { + return isActive; + } + + public String getTimeSinceLastActivity() { + return timeSinceLastActivity; + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java new file mode 100644 index 0000000000..be043861fb --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/GetOnDemandRaftState.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +/** + * Local message sent to a RaftActor to obtain a snapshot of statistical information. Returns an + * OnDemandRaftState instance. + * + * @author Thomas Pantelis + */ +public class GetOnDemandRaftState { + public static final GetOnDemandRaftState INSTANCE = new GetOnDemandRaftState(); + + private GetOnDemandRaftState() { + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java new file mode 100644 index 0000000000..8c2986f6d1 --- /dev/null +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.raft.client.messages; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * The response to a GetOnDemandRaftState message, + * + * @author Thomas Pantelis + */ +public class OnDemandRaftState { + private long lastLogIndex = -1L; + private long lastLogTerm = -1L; + private long currentTerm = -1L; + private long commitIndex = -1L; + private long lastApplied = -1L; + private long lastIndex = -1L; + private long lastTerm = -1L; + private long snapshotIndex = -1L; + private long snapshotTerm = -1L; + private long replicatedToAllIndex = -1L; + private long inMemoryJournalDataSize; + private long inMemoryJournalLogSize; + private String leader; + private String raftState; + private String votedFor; + private boolean isSnapshotCaptureInitiated; + + private List followerInfoList = Collections.emptyList(); + private Map peerAddresses = Collections.emptyMap(); + + private OnDemandRaftState() { + } + + public static Builder builder() { + return new Builder(); + } + + public long getLastLogIndex() { + return lastLogIndex; + } + + public long getLastLogTerm() { + return lastLogTerm; + } + + public long getCurrentTerm() { + return currentTerm; + } + + public long getCommitIndex() { + return commitIndex; + } + + public long getLastApplied() { + return lastApplied; + } + + public long getLastIndex() { + return lastIndex; + } + + public long getLastTerm() { + return lastTerm; + } + + public long getSnapshotIndex() { + return snapshotIndex; + } + + public long getSnapshotTerm() { + return snapshotTerm; + } + + public long getReplicatedToAllIndex() { + return replicatedToAllIndex; + } + + public long getInMemoryJournalDataSize() { + return inMemoryJournalDataSize; + } + + public long getInMemoryJournalLogSize() { + return inMemoryJournalLogSize; + } + + public String getLeader() { + return leader; + } + + public String getRaftState() { + return raftState; + } + + public String getVotedFor() { + return votedFor; + } + + public boolean isSnapshotCaptureInitiated() { + return isSnapshotCaptureInitiated; + } + + public List getFollowerInfoList() { + return followerInfoList; + } + + public Map getPeerAddresses() { + return peerAddresses; + } + + public static class Builder { + private final OnDemandRaftState stats = new OnDemandRaftState(); + + public Builder lastLogIndex(long value) { + stats.lastLogIndex = value; + return this; + } + + public Builder lastLogTerm(long value) { + stats.lastLogTerm = value; + return this; + } + + public Builder currentTerm(long value) { + stats.currentTerm = value; + return this; + } + + public Builder commitIndex(long value) { + stats.commitIndex = value; + return this; + } + + public Builder lastApplied(long value) { + stats.lastApplied = value; + return this; + } + + public Builder lastIndex(long value) { + stats.lastIndex = value; + return this; + } + + public Builder lastTerm(long value) { + stats.lastTerm = value; + return this; + } + + public Builder snapshotIndex(long value) { + stats.snapshotIndex = value; + return this; + } + + public Builder snapshotTerm(long value) { + stats.snapshotTerm = value; + return this; + } + + public Builder replicatedToAllIndex(long value) { + stats.replicatedToAllIndex = value; + return this; + } + + public Builder inMemoryJournalDataSize(long value) { + stats.inMemoryJournalDataSize = value; + return this; + } + + public Builder inMemoryJournalLogSize(long value) { + stats.inMemoryJournalLogSize = value; + return this; + } + + public Builder leader(String value) { + stats.leader = value; + return this; + } + + public Builder raftState(String value) { + stats.raftState = value; + return this; + } + + public Builder votedFor(String value) { + stats.votedFor = value; + return this; + } + + public Builder followerInfoList(List followerInfoList) { + stats.followerInfoList = followerInfoList; + return this; + } + + public Builder peerAddresses(Map peerAddresses) { + stats.peerAddresses = peerAddresses; + return this; + } + + public Builder isSnapshotCaptureInitiated(boolean value) { + stats.isSnapshotCaptureInitiated = value; + return this; + } + + public OnDemandRaftState build() { + return stats; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 94be1b0dc1..a8ae1818bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -66,7 +66,6 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; -import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; @@ -173,6 +172,7 @@ public class Shard extends RaftActor { shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(), datastoreContext.getDataStoreMXBeanType()); shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager()); + shardMBean.setShardActor(getSelf()); if (isMetricsCaptureEnabled()) { getContext().become(new MeteringBehavior(this)); @@ -792,7 +792,6 @@ public class Shard extends RaftActor { recoveryCoordinator = null; currentLogRecoveryBatch = null; - updateJournalStats(); //notify shard manager getContext().parent().tell(new ActorInitialized(), getSelf()); @@ -831,9 +830,6 @@ public class Shard extends RaftActor { persistenceId(), data, data.getClass().getClassLoader(), CompositeModificationPayload.class.getClassLoader()); } - - updateJournalStats(); - } private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) { @@ -851,19 +847,6 @@ public class Shard extends RaftActor { } } - private void updateJournalStats() { - ReplicatedLogEntry lastLogEntry = getLastLogEntry(); - - if (lastLogEntry != null) { - shardMBean.setLastLogIndex(lastLogEntry.getIndex()); - shardMBean.setLastLogTerm(lastLogEntry.getTerm()); - } - - shardMBean.setCommitIndex(getCommitIndex()); - shardMBean.setLastApplied(getLastApplied()); - shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize()); - } - @Override protected void createSnapshot() { // Create a transaction actor. We are really going to treat the transaction as a worker @@ -921,9 +904,6 @@ public class Shard extends RaftActor { delayedListenerRegistrations.clear(); } - shardMBean.setRaftState(getRaftState().name()); - shardMBean.setCurrentTerm(getCurrentTerm()); - // If this actor is no longer the leader close all the transaction chains if(!isLeader){ for(Map.Entry entry : transactionChains.entrySet()){ @@ -944,10 +924,6 @@ public class Shard extends RaftActor { return dataPersistenceProvider; } - @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { - shardMBean.setLeader(newLeader); - } - @Override public String persistenceId() { return this.name.toString(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java index 577a03c3a3..fb59b7643f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStats.java @@ -8,10 +8,21 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard; +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.base.Stopwatch; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean; import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats; @@ -19,6 +30,9 @@ import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXB import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats; import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; /** * Maintains statistics for a shard. @@ -28,6 +42,13 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { public static String JMX_CATEGORY_SHARD = "Shards"; + private static final Logger LOG = LoggerFactory.getLogger(ShardStats.class); + + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + private static final Cache onDemandRaftStateCache = + CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build(); + private long committedTransactionsCount; private long readOnlyTransactionCount; @@ -36,20 +57,6 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { private long readWriteTransactionCount; - private String leader; - - private String raftState; - - private long lastLogTerm = -1L; - - private long lastLogIndex = -1L; - - private long currentTerm = -1L; - - private long commitIndex = -1L; - - private long lastApplied = -1L; - private long lastCommittedTransactionTime; private long failedTransactionsCount; @@ -62,12 +69,13 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean; - private long dataSize = 0; + private boolean followerInitialSyncStatus = false; - private final SimpleDateFormat sdf = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private ActorRef shardActor; - private boolean followerInitialSyncStatus = false; + private String statRetrievalError; + + private String statRetrievalTime; public ShardStats(final String shardName, final String mxBeanType) { super(shardName, mxBeanType, JMX_CATEGORY_SHARD); @@ -80,6 +88,38 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor()); } + public void setShardActor(ActorRef shardActor) { + this.shardActor = shardActor; + } + + private OnDemandRaftState getOnDemandRaftState() { + String name = getShardName(); + OnDemandRaftState state = onDemandRaftStateCache.getIfPresent(name); + if(state == null) { + statRetrievalError = null; + statRetrievalTime = null; + + if(shardActor != null) { + Timeout timeout = new Timeout(10, TimeUnit.SECONDS); + try { + Stopwatch timer = Stopwatch.createStarted(); + + state = (OnDemandRaftState) Await.result(Patterns.ask(shardActor, + GetOnDemandRaftState.INSTANCE, timeout), timeout.duration()); + + statRetrievalTime = timer.stop().toString(); + onDemandRaftStateCache.put(name, state); + } catch (Exception e) { + statRetrievalError = e.toString(); + } + } + + state = state != null ? state : OnDemandRaftState.builder().build(); + } + + return state; + } + @Override public String getShardName() { return getMBeanName(); @@ -92,12 +132,12 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { @Override public String getLeader() { - return leader; + return getOnDemandRaftState().getLeader(); } @Override public String getRaftState() { - return raftState; + return getOnDemandRaftState().getRaftState(); } @Override @@ -117,33 +157,67 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { @Override public long getLastLogIndex() { - return lastLogIndex; + return getOnDemandRaftState().getLastLogIndex(); } @Override public long getLastLogTerm() { - return lastLogTerm; + return getOnDemandRaftState().getLastLogTerm(); } @Override public long getCurrentTerm() { - return currentTerm; + return getOnDemandRaftState().getCurrentTerm(); } @Override public long getCommitIndex() { - return commitIndex; + return getOnDemandRaftState().getCommitIndex(); } @Override public long getLastApplied() { - return lastApplied; + return getOnDemandRaftState().getLastApplied(); } @Override - public String getLastCommittedTransactionTime() { + public long getLastIndex() { + return getOnDemandRaftState().getLastIndex(); + } - return sdf.format(new Date(lastCommittedTransactionTime)); + @Override + public long getLastTerm() { + return getOnDemandRaftState().getLastTerm(); + } + + @Override + public long getSnapshotIndex() { + return getOnDemandRaftState().getSnapshotIndex(); + } + + @Override + public long getSnapshotTerm() { + return getOnDemandRaftState().getSnapshotTerm(); + } + + @Override + public long getReplicatedToAllIndex() { + return getOnDemandRaftState().getReplicatedToAllIndex(); + } + + @Override + public String getVotedFor() { + return getOnDemandRaftState().getVotedFor(); + } + + @Override + public boolean isSnapshotCaptureInitiated() { + return getOnDemandRaftState().isSnapshotCaptureInitiated(); + } + + @Override + public String getLastCommittedTransactionTime() { + return DATE_FORMAT.format(new Date(lastCommittedTransactionTime)); } @Override @@ -190,45 +264,18 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { return ++abortTransactionsCount; } - public void setLeader(final String leader) { - this.leader = leader; - } - - public void setRaftState(final String raftState) { - this.raftState = raftState; - } - - public void setLastLogTerm(final long lastLogTerm) { - this.lastLogTerm = lastLogTerm; - } - - public void setLastLogIndex(final long lastLogIndex) { - this.lastLogIndex = lastLogIndex; - } - - public void setCurrentTerm(final long currentTerm) { - this.currentTerm = currentTerm; - } - - public void setCommitIndex(final long commitIndex) { - this.commitIndex = commitIndex; - } - - public void setLastApplied(final long lastApplied) { - this.lastApplied = lastApplied; - } - public void setLastCommittedTransactionTime(final long lastCommittedTransactionTime) { this.lastCommittedTransactionTime = lastCommittedTransactionTime; } - public void setInMemoryJournalDataSize(long dataSize){ - this.dataSize = dataSize; + @Override + public long getInMemoryJournalDataSize(){ + return getOnDemandRaftState().getInMemoryJournalDataSize(); } @Override - public long getInMemoryJournalDataSize(){ - return dataSize; + public long getInMemoryJournalLogSize() { + return getOnDemandRaftState().getInMemoryJournalLogSize(); } @Override @@ -287,4 +334,36 @@ public class ShardStats extends AbstractMXBean implements ShardStatsMXBean { public boolean getFollowerInitialSyncStatus() { return followerInitialSyncStatus; } + + @Override + public List getFollowerInfo() { + return getOnDemandRaftState().getFollowerInfoList(); + } + + @Override + public String getPeerAddresses() { + StringBuilder builder = new StringBuilder(); + int i = 0; + for(Map.Entry e: getOnDemandRaftState().getPeerAddresses().entrySet()) { + if(i++ > 0) { + builder.append(", "); + } + + builder.append(e.getKey()).append(": ").append(e.getValue()); + } + + return builder.toString(); + } + + @Override + public String getStatRetrievalTime() { + getOnDemandRaftState(); + return statRetrievalTime; + } + + @Override + public String getStatRetrievalError() { + getOnDemandRaftState(); + return statRetrievalError; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java index 0281cdd8ce..1c0c83b699 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/shard/ShardStatsMXBean.java @@ -1,7 +1,7 @@ package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard; import java.util.List; - +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats; import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats; @@ -12,6 +12,10 @@ public interface ShardStatsMXBean { String getShardName(); + String getStatRetrievalTime(); + + String getStatRetrievalError(); + long getCommittedTransactionsCount(); long getReadOnlyTransactionCount(); @@ -30,6 +34,16 @@ public interface ShardStatsMXBean { long getLastApplied(); + long getLastIndex(); + + long getLastTerm(); + + long getSnapshotIndex(); + + long getSnapshotTerm(); + + long getReplicatedToAllIndex(); + String getLastCommittedTransactionTime(); long getFailedTransactionsCount(); @@ -42,6 +56,10 @@ public interface ShardStatsMXBean { String getRaftState(); + String getVotedFor(); + + boolean isSnapshotCaptureInitiated(); + ThreadExecutorStats getDataStoreExecutorStats(); ThreadExecutorStats getNotificationMgrExecutorStats(); @@ -54,5 +72,11 @@ public interface ShardStatsMXBean { long getInMemoryJournalDataSize(); + long getInMemoryJournalLogSize(); + boolean getFollowerInitialSyncStatus(); + + List getFollowerInfo(); + + String getPeerAddresses(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 3ac61f2371..8cafb46528 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -153,16 +154,17 @@ public abstract class AbstractShardTest extends AbstractActorTest{ shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - protected void verifyLastLogIndex(TestActorRef shard, long expectedValue) { + protected void verifyLastApplied(TestActorRef shard, long expectedValue) { + long lastApplied = -1; for(int i = 0; i < 20 * 5; i++) { - long lastLogIndex = shard.underlyingActor().getShardMBean().getLastLogIndex(); - if(lastLogIndex == expectedValue) { - break; + lastApplied = shard.underlyingActor().getShardMBean().getLastApplied(); + if(lastApplied == expectedValue) { + return; } Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - assertEquals("Last log index", expectedValue, shard.underlyingActor().getShardMBean().getLastLogIndex()); + Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied)); } protected NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index e10f566677..4c4fedb8b2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -593,7 +593,7 @@ public class ShardTest extends AbstractShardTest { assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - verifyLastLogIndex(shard, 2); + verifyLastApplied(shard, 2); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 471009d4df..cc860eafc7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -211,7 +211,7 @@ public class PreLithiumShardTest extends AbstractShardTest { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), - "testConcurrentThreePhaseCommits"); + "testPreLithiumConcurrentThreePhaseCommits"); waitUntilLeader(shard); @@ -386,7 +386,7 @@ public class PreLithiumShardTest extends AbstractShardTest { assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); - verifyLastLogIndex(shard, 2); + verifyLastApplied(shard, 2); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; -- 2.36.6