It's useful to see what listeners (DTCL/DCL) are registered for each shard.
Added a new message, GetInfo, sent to the DTCL/DCL actors that returns
DataTreeListenerInfo, including the stringified user listener instance
and the stringified YangInstanceIdentifier path. A new
ShardDataTreeListenerInfoMXBean is instantiated for each shard which
reports the DTCL and DCL info.
Change-Id: I312bc5d03fe836bc208ea442ebc2af0ef103120f
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
}
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- OnDemandRaftState.AbstractBuilder<?> builder = newOnDemandRaftStateBuilder()
+ OnDemandRaftState.AbstractBuilder<?, ?> builder = newOnDemandRaftStateBuilder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
.inMemoryJournalDataSize(replicatedLog().dataSize())
}
- protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
return OnDemandRaftState.builder();
}
return customRaftPolicyClassName;
}
- public abstract static class AbstractBuilder<T extends AbstractBuilder<T>> {
+ public abstract static class AbstractBuilder<B extends AbstractBuilder<B, T>, T extends OnDemandRaftState> {
@SuppressWarnings("unchecked")
- protected T self() {
- return (T) this;
+ protected B self() {
+ return (B) this;
}
@Nonnull
protected abstract OnDemandRaftState state();
- public T lastLogIndex(long value) {
+ public B lastLogIndex(long value) {
state().lastLogIndex = value;
return self();
}
- public T lastLogTerm(long value) {
+ public B lastLogTerm(long value) {
state().lastLogTerm = value;
return self();
}
- public T currentTerm(long value) {
+ public B currentTerm(long value) {
state().currentTerm = value;
return self();
}
- public T commitIndex(long value) {
+ public B commitIndex(long value) {
state().commitIndex = value;
return self();
}
- public T lastApplied(long value) {
+ public B lastApplied(long value) {
state().lastApplied = value;
return self();
}
- public T lastIndex(long value) {
+ public B lastIndex(long value) {
state().lastIndex = value;
return self();
}
- public T lastTerm(long value) {
+ public B lastTerm(long value) {
state().lastTerm = value;
return self();
}
- public T snapshotIndex(long value) {
+ public B snapshotIndex(long value) {
state().snapshotIndex = value;
return self();
}
- public T snapshotTerm(long value) {
+ public B snapshotTerm(long value) {
state().snapshotTerm = value;
return self();
}
- public T replicatedToAllIndex(long value) {
+ public B replicatedToAllIndex(long value) {
state().replicatedToAllIndex = value;
return self();
}
- public T inMemoryJournalDataSize(long value) {
+ public B inMemoryJournalDataSize(long value) {
state().inMemoryJournalDataSize = value;
return self();
}
- public T inMemoryJournalLogSize(long value) {
+ public B inMemoryJournalLogSize(long value) {
state().inMemoryJournalLogSize = value;
return self();
}
- public T leader(String value) {
+ public B leader(String value) {
state().leader = value;
return self();
}
- public T raftState(String value) {
+ public B raftState(String value) {
state().raftState = value;
return self();
}
- public T votedFor(String value) {
+ public B votedFor(String value) {
state().votedFor = value;
return self();
}
- public T isVoting(boolean isVoting) {
+ public B isVoting(boolean isVoting) {
state().isVoting = isVoting;
return self();
}
- public T followerInfoList(List<FollowerInfo> followerInfoList) {
+ public B followerInfoList(List<FollowerInfo> followerInfoList) {
state().followerInfoList = followerInfoList;
return self();
}
- public T peerAddresses(Map<String, String> peerAddresses) {
+ public B peerAddresses(Map<String, String> peerAddresses) {
state().peerAddresses = peerAddresses;
return self();
}
- public T peerVotingStates(Map<String, Boolean> peerVotingStates) {
+ public B peerVotingStates(Map<String, Boolean> peerVotingStates) {
state().peerVotingStates = ImmutableMap.copyOf(peerVotingStates);
return self();
}
- public T isSnapshotCaptureInitiated(boolean value) {
+ public B isSnapshotCaptureInitiated(boolean value) {
state().isSnapshotCaptureInitiated = value;
return self();
}
- public T customRaftPolicyClassName(String className) {
+ public B customRaftPolicyClassName(String className) {
state().customRaftPolicyClassName = className;
return self();
}
- public OnDemandRaftState build() {
- return state();
+ @SuppressWarnings("unchecked")
+ public T build() {
+ return (T) state();
}
}
- public static class Builder extends AbstractBuilder<Builder> {
+ public static class Builder extends AbstractBuilder<Builder, OnDemandRaftState> {
private final OnDemandRaftState state = new OnDemandRaftState();
@Override
final Collection<DataTreeModification<T>> bindingChanges = LazyDataTreeModification.from(codec, domChanges, store);
listener.onDataTreeChanged(bindingChanges);
}
+
+ @Override
+ public String toString() {
+ return listener.toString();
+ }
}
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeListenerInfo;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.GetInfo;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
+ private long notificationCount;
public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
final YangInstanceIdentifier registeredPath) {
dataChanged(message);
} else if (message instanceof EnableNotification) {
enableNotification((EnableNotification) message);
+ } else if (message instanceof GetInfo) {
+ getSender().tell(new DataTreeListenerInfo(listener.toString(), registeredPath.toString(),
+ notificationsEnabled, notificationCount), getSelf());
} else {
unknownMessage(message);
}
LOG.debug("Sending change notification {} to listener {}", change, listener);
+ notificationCount++;
+
try {
this.listener.onDataChanged(change);
} catch (RuntimeException e) {
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeListenerInfo;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.GetInfo;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final DOMDataTreeChangeListener listener;
private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
+ private long notificationCount;
private String logContext = "";
private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
dataChanged((DataTreeChanged)message);
} else if (message instanceof EnableNotification) {
enableNotification((EnableNotification) message);
+ } else if (message instanceof GetInfo) {
+ getSender().tell(new DataTreeListenerInfo(listener.toString(), registeredPath.toString(),
+ notificationsEnabled, notificationCount), getSelf());
} else {
unknownMessage(message);
}
LOG.debug("{}: Sending {} change notification(s) {} to listener {}", logContext, message.getChanges().size(),
message.getChanges(), listener);
+ notificationCount++;
+
try {
this.listener.onDataTreeChanged(message.getChanges());
} catch (Exception e) {
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardDataTreeListenerInfoMXBeanImpl;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
private final ShardStats shardMBean;
+ private final ShardDataTreeListenerInfoMXBeanImpl listenerInfoMXBean;
+
private DatastoreContext datastoreContext;
private final ShardCommitCoordinator commitCoordinator;
.fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
.assembledMessageCallback((message, sender) -> self().tell(message, sender))
.expireStateAfterInactivity(datastoreContext.getRequestTimeout(), TimeUnit.NANOSECONDS).build();
+
+ listenerInfoMXBean = new ShardDataTreeListenerInfoMXBeanImpl(name, datastoreContext.getDataStoreMXBeanType(),
+ self());
+ listenerInfoMXBean.register();
}
private void setTransactionCommitTimeout() {
commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
shardMBean.unregisterMBean();
+ listenerInfoMXBean.unregister();
}
@Override
}
@Override
- protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ protected OnDemandRaftState.AbstractBuilder<?, ?> newOnDemandRaftStateBuilder() {
return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
.dataChangeListenerActors(changeSupport.getListenerActors())
.commitCohortActors(store.getCohortActors());
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import scala.concurrent.Await;
+
+/**
+ * Maintains a short-lived shared cache of OnDemandShardState.
+ *
+ * @author Thomas Pantelis
+ */
+class OnDemandShardStateCache {
+ private static final Cache<String, OnDemandShardState> ONDEMAND_SHARD_STATE_CACHE =
+ CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build();
+
+ private final ActorRef shardActor;
+ private final String shardName;
+ private volatile String stateRetrievalTime;
+
+ OnDemandShardStateCache(String shardName, ActorRef shardActor) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.shardActor = shardActor;
+ }
+
+ OnDemandShardState get() throws Exception {
+ if (shardActor == null) {
+ return OnDemandShardState.newBuilder().build();
+ }
+
+ try {
+ return ONDEMAND_SHARD_STATE_CACHE.get(shardName, this::retrieveState);
+ } catch (ExecutionException | UncheckedExecutionException | ExecutionError e) {
+ if (e.getCause() != null) {
+ Throwables.propagateIfPossible(e.getCause(), Exception.class);
+ throw new RuntimeException("unexpected", e.getCause());
+ }
+
+ throw e;
+ }
+ }
+
+ String getStatRetrievaelTime() {
+ return stateRetrievalTime;
+ }
+
+ private OnDemandShardState retrieveState() throws Exception {
+ stateRetrievalTime = null;
+ Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
+ Stopwatch timer = Stopwatch.createStarted();
+
+ OnDemandShardState state = (OnDemandShardState) Await.result(Patterns.ask(shardActor,
+ GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+
+ stateRetrievalTime = timer.stop().toString();
+ return state;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import java.util.List;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeListenerInfo;
+
+/**
+ * MXBean interface for reporting shard data tree change listener information.
+ *
+ * @author Thomas Pantelis
+ */
+public interface ShardDataTreeListenerInfoMXBean {
+ List<DataTreeListenerInfo> getDataTreeChangeListenerInfo();
+
+ @Deprecated
+ List<DataTreeListenerInfo> getDataChangeListenerInfo();
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeListenerInfo;
+import org.opendaylight.controller.cluster.datastore.messages.GetInfo;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
+import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+
+/**
+ * Implementation of ShardDataTreeListenerInfoMXBean.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardDataTreeListenerInfoMXBeanImpl extends AbstractMXBean implements ShardDataTreeListenerInfoMXBean {
+ private static final String JMX_CATEGORY = "ShardDataTreeListenerInfo";
+
+ private final OnDemandShardStateCache stateCache;
+
+ public ShardDataTreeListenerInfoMXBeanImpl(final String shardName, final String mxBeanType,
+ final ActorRef shardActor) {
+ super(shardName, mxBeanType, JMX_CATEGORY);
+ stateCache = new OnDemandShardStateCache(shardName, Preconditions.checkNotNull(shardActor));
+ }
+
+ @Override
+ public List<DataTreeListenerInfo> getDataTreeChangeListenerInfo() {
+ return getListenerActorsInfo(getState().getTreeChangeListenerActors());
+ }
+
+ @Override
+ public List<DataTreeListenerInfo> getDataChangeListenerInfo() {
+ return getListenerActorsInfo(getState().getDataChangeListenerActors());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private OnDemandShardState getState() {
+ try {
+ return stateCache.get();
+ } catch (Exception e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private List<DataTreeListenerInfo> getListenerActorsInfo(Collection<ActorSelection> actors) {
+ final Timeout timeout = new Timeout(20, TimeUnit.SECONDS);
+ final List<Future<Object>> futureList = new ArrayList<>(actors.size());
+ for (ActorSelection actor: actors) {
+ futureList.add(Patterns.ask(actor, GetInfo.INSTANCE, timeout));
+ }
+
+ try {
+ final List<DataTreeListenerInfo> listenerInfoList = new ArrayList<>();
+ Await.result(Futures.sequence(futureList, ExecutionContext.Implicits$.MODULE$.global()),
+ timeout.duration()).forEach(obj -> listenerInfoList.add((DataTreeListenerInfo) obj));
+ return listenerInfoList;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
package org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard;
import akka.actor.ActorRef;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import com.google.common.base.Joiner;
import com.google.common.base.Joiner.MapJoiner;
-import com.google.common.base.Stopwatch;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo;
-import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
-import scala.concurrent.Await;
/**
* Maintains statistics for a shard.
@GuardedBy("DATE_FORMAT")
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
- private static final Cache<String, OnDemandRaftState> ONDEMAND_RAFT_STATE_CACHE =
- CacheBuilder.newBuilder().expireAfterWrite(2, TimeUnit.SECONDS).build();
-
private static final MapJoiner MAP_JOINER = Joiner.on(", ").withKeyValueSeparator(": ");
+ private final Shard shard;
+
+ private final OnDemandShardStateCache stateCache;
+
private long committedTransactionsCount;
private long readOnlyTransactionCount;
private boolean followerInitialSyncStatus = false;
- private final Shard shard;
-
private String statRetrievalError;
- private String statRetrievalTime;
-
private long leadershipChangeCount;
private long lastLeadershipChangeTime;
public ShardStats(final String shardName, final String mxBeanType, @Nullable final Shard shard) {
super(shardName, mxBeanType, JMX_CATEGORY_SHARD);
this.shard = shard;
+ stateCache = new OnDemandShardStateCache(shardName, shard != null ? shard.self() : null);
}
@SuppressWarnings("checkstyle:IllegalCatch")
private OnDemandRaftState getOnDemandRaftState() {
- String name = getShardName();
- OnDemandRaftState state = ONDEMAND_RAFT_STATE_CACHE.getIfPresent(name);
- if (state == null) {
+ try {
+ final OnDemandRaftState state = stateCache.get();
statRetrievalError = null;
- statRetrievalTime = null;
-
- if (shard != null) {
- Timeout timeout = new Timeout(10, TimeUnit.SECONDS);
- try {
- Stopwatch timer = Stopwatch.createStarted();
-
- state = (OnDemandRaftState) Await.result(Patterns.ask(shard.getSelf(),
- GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
-
- statRetrievalTime = timer.stop().toString();
- ONDEMAND_RAFT_STATE_CACHE.put(name, state);
- } catch (Exception e) {
- statRetrievalError = e.toString();
- }
- }
-
- state = state != null ? state : OnDemandRaftState.builder().build();
+ return state;
+ } catch (Exception e) {
+ statRetrievalError = e.getCause().toString();
+ return OnDemandRaftState.builder().build();
}
-
- return state;
}
private static String formatMillis(final long timeMillis) {
@Override
public String getStatRetrievalTime() {
getOnDemandRaftState();
- return statRetrievalTime;
+ return stateCache.getStatRetrievaelTime();
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.beans.ConstructorProperties;
+
+/**
+ * Response to a {@link GetInfo} query from a data tree listener actor.
+ *
+ * @author Thomas Pantelis
+ */
+public class DataTreeListenerInfo {
+ private final String listener;
+ private final String registeredPath;
+ private final boolean isEnabled;
+ private final long notificationCount;
+
+ @ConstructorProperties({"listener","registeredPath", "isEnabled", "notificationCount"})
+ public DataTreeListenerInfo(final String listener, final String registeredPath, final boolean isEnabled,
+ final long notificationCount) {
+ this.listener = Preconditions.checkNotNull(listener);
+ this.registeredPath = Preconditions.checkNotNull(registeredPath);
+ this.isEnabled = isEnabled;
+ this.notificationCount = notificationCount;
+ }
+
+ public String getListener() {
+ return listener;
+ }
+
+ public String getRegisteredPath() {
+ return registeredPath;
+ }
+
+ public boolean isEnabled() {
+ return isEnabled;
+ }
+
+ public long getNotificationCount() {
+ return notificationCount;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Inocybe Technologies and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+/**
+ * Local message sent to an actor to retrieve internal information for reporting.
+ *
+ * @author Thomas Pantelis
+ */
+public final class GetInfo {
+ public static final GetInfo INSTANCE = new GetInfo();
+
+ private GetInfo() {
+ }
+}
return new Builder();
}
- public static class Builder extends AbstractBuilder<Builder> {
+ public static class Builder extends AbstractBuilder<Builder, OnDemandShardState> {
private final OnDemandShardState state = new OnDemandShardState();
@Override
state.commitCohortActors = actors;
return self();
}
+
+ @Override
+ public OnDemandShardState build() {
+ return super.build();
+ }
}
}