package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Stopwatch;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.base.Joiner;
+import com.google.common.base.Joiner.MapJoiner;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
-import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
-import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
-import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStats;
-import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.yangtools.util.concurrent.ListenerNotificationQueueStats;
-import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
/**
* Maintains statistics for a shard.
* @author Basheeruddin syedbahm@cisco.com
*/
public class ShardStats extends AbstractMXBean implements ShardStatsMXBean {
- public static String JMX_CATEGORY_SHARD = "Shards";
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardStats.class);
+ public static final String JMX_CATEGORY_SHARD = "Shards";
+ @GuardedBy("DATE_FORMAT")
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- private static final Cache<String, OnDemandRaftState> onDemandRaftStateCache =
- CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build();
+ private static final MapJoiner MAP_JOINER = Joiner.on(", ").withKeyValueSeparator(": ");
+
+ private final Shard shard;
+
+ private final OnDemandShardStateCache stateCache;
private long committedTransactionsCount;
private long abortTransactionsCount;
- private ThreadExecutorStatsMXBeanImpl notificationExecutorStatsBean;
-
- private QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
-
private boolean followerInitialSyncStatus = false;
- private ActorRef shardActor;
-
private String statRetrievalError;
- private String statRetrievalTime;
-
private long leadershipChangeCount;
private long lastLeadershipChangeTime;
- public ShardStats(final String shardName, final String mxBeanType) {
+ public ShardStats(final String shardName, final String mxBeanType, @Nullable final Shard shard) {
super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
+ this.shard = shard;
+ stateCache = new OnDemandShardStateCache(shardName, shard != null ? shard.self() : null);
}
- public void setNotificationManager(final QueuedNotificationManager<?, ?> manager) {
- this.notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
- "notification-manager", getMBeanType(), getMBeanCategory());
-
- this.notificationExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(manager.getExecutor());
- }
-
- public void setShardActor(ActorRef shardActor) {
- this.shardActor = shardActor;
- }
-
+ @SuppressWarnings("checkstyle:IllegalCatch")
private OnDemandRaftState getOnDemandRaftState() {
- String name = getShardName();
- OnDemandRaftState state = onDemandRaftStateCache.getIfPresent(name);
- if(state == null) {
+ try {
+ final OnDemandRaftState state = stateCache.get();
statRetrievalError = null;
- statRetrievalTime = null;
-
- if(shardActor != null) {
- Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
- try {
- Stopwatch timer = Stopwatch.createStarted();
-
- state = (OnDemandRaftState) Await.result(Patterns.ask(shardActor,
- GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
-
- statRetrievalTime = timer.stop().toString();
- onDemandRaftStateCache.put(name, state);
- } catch (Exception e) {
- statRetrievalError = e.toString();
- }
- }
-
- state = state != null ? state : OnDemandRaftState.builder().build();
+ return state;
+ } catch (Exception e) {
+ statRetrievalError = e.getCause().toString();
+ return OnDemandRaftState.builder().build();
}
+ }
- return state;
+ private static String formatMillis(final long timeMillis) {
+ synchronized (DATE_FORMAT) {
+ return DATE_FORMAT.format(new Date(timeMillis));
+ }
}
@Override
return getOnDemandRaftState().getVotedFor();
}
+ @Override
+ public boolean isVoting() {
+ return getOnDemandRaftState().isVoting();
+ }
+
+ @Override
+ public String getPeerVotingStates() {
+ return MAP_JOINER.join(getOnDemandRaftState().getPeerVotingStates());
+ }
+
@Override
public boolean isSnapshotCaptureInitiated() {
return getOnDemandRaftState().isSnapshotCaptureInitiated();
@Override
public String getLastCommittedTransactionTime() {
- return DATE_FORMAT.format(new Date(lastCommittedTransactionTime));
+ return formatMillis(lastCommittedTransactionTime);
}
@Override
return failedReadTransactionsCount.incrementAndGet();
}
- public long incrementAbortTransactionsCount ()
- {
+ public long incrementAbortTransactionsCount() {
return ++abortTransactionsCount;
}
}
@Override
- public long getInMemoryJournalDataSize(){
+ public long getInMemoryJournalDataSize() {
return getOnDemandRaftState().getInMemoryJournalDataSize();
}
return getOnDemandRaftState().getInMemoryJournalLogSize();
}
- @Override
- public ThreadExecutorStats getDataStoreExecutorStats() {
- // FIXME: this particular thing does not work, as it really is DS-specific
- return null;
- }
-
- @Override
- public ThreadExecutorStats getNotificationMgrExecutorStats() {
- return notificationExecutorStatsBean.toThreadExecutorStats();
- }
-
- @Override
- public List<ListenerNotificationQueueStats> getCurrentNotificationMgrListenerQueueStats() {
- return notificationManagerStatsBean.getCurrentListenerQueueStats();
- }
-
- @Override
- public int getMaxNotificationMgrListenerQueueSize() {
- return notificationManagerStatsBean.getMaxListenerQueueSize();
- }
-
/**
- * resets the counters related to transactions
+ * Resets the counters related to transactions.
*/
@Override
- public void resetTransactionCounters(){
+ public void resetTransactionCounters() {
committedTransactionsCount = 0;
readOnlyTransactionCount = 0;
}
- public void setDataStore(final InMemoryDOMDataStore store) {
- setNotificationManager(store.getDataChangeListenerNotificationManager());
- }
-
- public void setFollowerInitialSyncStatus(boolean followerInitialSyncStatus) {
+ public void setFollowerInitialSyncStatus(final boolean followerInitialSyncStatus) {
this.followerInitialSyncStatus = followerInitialSyncStatus;
}
@Override
public String getPeerAddresses() {
- StringBuilder builder = new StringBuilder();
- int i = 0;
- for(Map.Entry<String, String> e: getOnDemandRaftState().getPeerAddresses().entrySet()) {
- if(i++ > 0) {
- builder.append(", ");
- }
-
- builder.append(e.getKey()).append(": ").append(e.getValue());
- }
-
- return builder.toString();
+ return MAP_JOINER.join(getOnDemandRaftState().getPeerAddresses());
}
@Override
public String getStatRetrievalTime() {
getOnDemandRaftState();
- return statRetrievalTime;
+ return stateCache.getStatRetrievaelTime();
}
@Override
@Override
public String getLastLeadershipChangeTime() {
- return DATE_FORMAT.format(new Date(lastLeadershipChangeTime));
+ return formatMillis(lastLeadershipChangeTime);
+ }
+
+ @Override
+ public int getPendingTxCommitQueueSize() {
+ return shard != null ? shard.getPendingTxCommitQueueSize() : -1;
+ }
+
+ @Override
+ public int getTxCohortCacheSize() {
+ return shard != null ? shard.getCohortCacheSize() : -1;
+ }
+
+ @Override
+ public void captureSnapshot() {
+ if (shard != null) {
+ shard.getSelf().tell(new InitiateCaptureSnapshot(), ActorRef.noSender());
+ }
}
}