void onLeadershipChange(boolean isLeader, boolean hasLeader) {
log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
- final EnableNotification msg = new EnableNotification(isLeader);
+ final EnableNotification msg = new EnableNotification(isLeader, persistenceId());
for (ActorSelection dataChangeListener : leaderOnlyListenerActors) {
dataChangeListener.tell(msg, getSelf());
}
@Override
void onMessage(M message, boolean isLeader, boolean hasLeader) {
- log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
+ log.debug("{}: {} for {}, isLeader: {}, hasLeader: {}", persistenceId(), logName(), message,
+ isLeader, hasLeader);
ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props());
final ActorSelection listenerActor = selectActor(message.getListenerActorPath());
// We have a leader so enable the listener.
- listenerActor.tell(new EnableNotification(true), getSelf());
+ listenerActor.tell(new EnableNotification(true, persistenceId()), getSelf());
if (!message.isRegisterOnAllInstances()) {
// This is a leader-only registration so store a reference to the listener actor so it can be notified
*/
@NotThreadSafe
abstract class AbstractShardDataTreeNotificationPublisherActorProxy implements ShardDataTreeNotificationPublisher {
- private static final Logger LOG = LoggerFactory.getLogger(
- AbstractShardDataTreeNotificationPublisherActorProxy.class);
+ protected final Logger log = LoggerFactory.getLogger(getClass());
private final ActorContext actorContext;
private final String actorName;
private final String logContext;
- private ActorRef notifierActor;
+ private ActorRef publisherActor;
protected AbstractShardDataTreeNotificationPublisherActorProxy(ActorContext actorContext, String actorName,
String logContext) {
}
@Override
- public void publishChanges(DataTreeCandidate candidate, String logContext) {
- notifierActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(candidate),
+ public void publishChanges(DataTreeCandidate candidate) {
+ publisherActor().tell(new ShardDataTreeNotificationPublisherActor.PublishNotifications(candidate),
ActorRef.noSender());
}
- protected final ActorRef notifierActor() {
- if (notifierActor == null) {
- LOG.debug("Creating actor {}", actorName);
-
+ protected final ActorRef publisherActor() {
+ if (publisherActor == null) {
String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Notification);
- notifierActor = actorContext.actorOf(props().withDispatcher(dispatcher).withMailbox(
+ publisherActor = actorContext.actorOf(props().withDispatcher(dispatcher).withMailbox(
org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
+
+ log.debug("{}: Created publisher actor {} with name {}", logContext, publisherActor, actorName);
}
- return notifierActor;
+ return publisherActor;
}
}
private final DOMDataTreeChangeListener listener;
private final YangInstanceIdentifier registeredPath;
private boolean notificationsEnabled = false;
+ private String logContext = "";
private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener,
final YangInstanceIdentifier registeredPath) {
private void dataChanged(final DataTreeChanged message) {
// Do nothing if notifications are not enabled
if (!notificationsEnabled) {
- LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
+ LOG.debug("{}: Notifications not enabled for listener {} - dropping change notification",
+ logContext, listener);
return;
}
- LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener);
+ LOG.debug("{}: Sending {} change notification(s) {} to listener {}", logContext, message.getChanges().size(),
+ message.getChanges(), listener);
try {
this.listener.onDataTreeChanged(message.getChanges());
} catch (Exception e) {
- LOG.error("Error notifying listener {}", this.listener, e);
+ LOG.error("{}: Error notifying listener {}", logContext, this.listener, e);
}
// TODO: do we really need this?
}
private void enableNotification(final EnableNotification message) {
+ logContext = message.getLogContext();
notificationsEnabled = message.isEnabled();
- LOG.debug("{} notifications for listener {}", notificationsEnabled ? "Enabled" : "Disabled",
+ LOG.debug("{}: {} notifications for listener {}", logContext, notificationsEnabled ? "Enabled" : "Disabled",
listener);
}
this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
DataTreeChangeListenerActor.props(getInstance(), registeredPath)
.withDispatcher(actorContext.getNotificationDispatcherPath()));
+
+ LOG.debug("{}: Created actor {} for DTCL {}", actorContext.getDatastoreContext().getLogicalStoreType(),
+ dataChangeListenerActor, listener);
}
@Override
@Override
public void onComplete(final Throwable failure, final ActorRef shard) {
if (failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} "
- + "cannot be registered", shardName, getInstance(), registeredPath);
+ LOG.debug("{}: No local shard found for {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered", logContext(), shardName, getInstance(), registeredPath);
} else if (failure != null) {
- LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} "
- + "cannot be registered: {}", shardName, getInstance(), registeredPath, failure);
+ LOG.error("{}: Failed to find local shard {} - DataTreeChangeListener {} at path {} "
+ + "cannot be registered: {}", logContext(), shardName, getInstance(), registeredPath,
+ failure);
} else {
doRegistration(shard);
}
private void setListenerRegistrationActor(final ActorSelection actor) {
if (actor == null) {
- LOG.debug("Ignoring null actor on {}", this);
+ LOG.debug("{}: Ignoring null actor on {}", logContext(), this);
return;
}
@Override
public void onComplete(final Throwable failure, final Object result) {
if (failure != null) {
- LOG.error("Failed to register DataTreeChangeListener {} at path {}",
+ LOG.error("{}: Failed to register DataTreeChangeListener {} at path {}", logContext(),
getInstance(), registeredPath, failure);
} else {
RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
ActorRef getDataChangeListenerActor() {
return dataChangeListenerActor;
}
+
+ private String logContext() {
+ return actorContext.getDatastoreContext().getLogicalStoreType().toString();
+ }
}
DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor);
- log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
+ log().debug("{}: Registering listenerActor {} for path {}", persistenceId(), listenerActor, message.getPath());
final ShardDataTree shardDataTree = getShard().getDataStore();
shardDataTree.registerTreeChangeListener(message.getPath(),
private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
private final ListenerTree dataChangeListenerTree = ListenerTree.create();
+ private final String logContext;
+
+ DefaultShardDataChangeListenerPublisher(String logContext) {
+ this.logContext = logContext;
+ }
@Override
public void submitNotification(final DataChangeListenerRegistration<?> listener,
final DOMImmutableDataChangeEvent notification) {
- LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+ LOG.debug("{}: Notifying listener {} about {}", logContext, listener.getInstance(), notification);
listener.getInstance().onDataChanged(notification);
}
public void submitNotifications(final DataChangeListenerRegistration<?> listener,
final Iterable<DOMImmutableDataChangeEvent> notifications) {
final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
- LOG.debug("Notifying listener {} about {}", instance, notifications);
+ LOG.debug("{}: Notifying listener {} about {}", logContext, instance, notifications);
for (DOMImmutableDataChangeEvent n : notifications) {
instance.onDataChanged(n);
}
@Override
- public void publishChanges(DataTreeCandidate candidate, String logContext) {
+ public void publishChanges(DataTreeCandidate candidate) {
ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
}
onRegistration.accept(registration);
if (initialState.isPresent()) {
- notifySingleListener(path, listener, scope, initialState.get());
+ notifySingleListener(path, listener, scope, initialState.get(), logContext);
}
}
static void notifySingleListener(final YangInstanceIdentifier path,
final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
- final DataChangeScope scope, final DataTreeCandidate initialState) {
- DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher();
+ final DataChangeScope scope, final DataTreeCandidate initialState, String logContext) {
+ DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(logContext);
publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
- publisher.publishChanges(initialState, "");
+ publisher.publishChanges(initialState);
}
}
final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStoreTreeChangePublisher
implements ShardDataTreeChangeListenerPublisher {
private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataTreeChangeListenerPublisher.class);
+ private String logContext;
+
+ DefaultShardDataTreeChangeListenerPublisher(String logContext) {
+ this.logContext = logContext;
+ }
@Override
- public void publishChanges(final DataTreeCandidate candidate, String logContext) {
+ public void publishChanges(final DataTreeCandidate candidate) {
+ LOG.debug("{}: publishChanges: {}", logContext, candidate);
processCandidateTree(candidate);
}
@Override
protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration<?> registration,
Collection<DataTreeCandidate> changes) {
+ LOG.debug("{}: notifyListener: listener: {}", logContext, registration.getInstance());
registration.getInstance().onDataTreeChanged(changes);
}
public void registerTreeChangeListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
Optional<DataTreeCandidate> initialState,
Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+ LOG.debug("{}: registerTreeChangeListener: path: {}, listener: {}", logContext, treeId, listener);
+
AbstractDOMDataTreeChangeListenerRegistration<org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener>
- registration = super.registerTreeChangeListener(treeId,
- (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)changes ->
- listener.onDataTreeChanged(changes));
+ registration = super.registerTreeChangeListener(treeId, new ForwardingDOMDataTreeChangeListener(listener));
onRegistration.accept(
new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<
});
if (initialState.isPresent()) {
- notifySingleListener(treeId, listener, initialState.get());
+ notifySingleListener(treeId, listener, initialState.get(), logContext);
}
}
static void notifySingleListener(YangInstanceIdentifier treeId, DOMDataTreeChangeListener listener,
- DataTreeCandidate state) {
- DefaultShardDataTreeChangeListenerPublisher publisher = new DefaultShardDataTreeChangeListenerPublisher();
+ DataTreeCandidate state, String logContext) {
+ LOG.debug("{}: notifySingleListener: path: {}, listener: {}", logContext, treeId, listener);
+ DefaultShardDataTreeChangeListenerPublisher publisher =
+ new DefaultShardDataTreeChangeListenerPublisher(logContext);
+ publisher.logContext = logContext;
publisher.registerTreeChangeListener(treeId, listener, Optional.absent(), noop -> { });
- publisher.publishChanges(state, "");
+ publisher.publishChanges(state);
+ }
+
+ private static class ForwardingDOMDataTreeChangeListener
+ implements org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener {
+ final DOMDataTreeChangeListener delegate;
+
+ ForwardingDOMDataTreeChangeListener(DOMDataTreeChangeListener delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ delegate.onDataTreeChanged(changes);
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
}
}
import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Internal implementation of a {@link DOMDataTreeChangeListener} which
* message and forwards them towards the client's {@link DataTreeChangeListenerActor}.
*/
final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ForwardingDataTreeChangeListener.class);
+
private final ActorSelection actor;
ForwardingDataTreeChangeListener(final ActorSelection actor) {
@Override
public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ LOG.debug("Sending DataTreeChanged to {}", actor);
actor.tell(new DataTreeChanged(changes), ActorRef.noSender());
}
+
+ @Override
+ public String toString() {
+ return "ForwardingDataTreeChangeListener [actor=" + actor + "]";
+ }
}
Optional<DataTreeCandidate> initialState,
Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
onRegistration) {
- notifierActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState,
+ publisherActor().tell(new ShardDataChangePublisherActor.RegisterListener(path, listener, scope, initialState,
onRegistration), ActorRef.noSender());
}
extends ShardDataTreeNotificationPublisherActor<ShardDataChangeListenerPublisher> {
private ShardDataChangePublisherActor(final String name, final String logContext) {
- super(new DefaultShardDataChangeListenerPublisher(), name, logContext);
+ super(new DefaultShardDataChangeListenerPublisher(logContext), name, logContext);
}
@Override
RegisterListener reg = (RegisterListener)message;
if (reg.initialState.isPresent()) {
DefaultShardDataChangeListenerPublisher.notifySingleListener(reg.path, reg.listener, reg.scope,
- reg.initialState.get());
+ reg.initialState.get(), logContext());
}
publisher().registerDataChangeListener(reg.path, reg.listener, reg.scope, Optional.absent(),
@VisibleForTesting
public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
- new DefaultShardDataTreeChangeListenerPublisher(), new DefaultShardDataChangeListenerPublisher(), "");
+ new DefaultShardDataTreeChangeListenerPublisher(""),
+ new DefaultShardDataChangeListenerPublisher(""), "");
}
final String logContext() {
@VisibleForTesting
public void notifyListeners(final DataTreeCandidate candidate) {
- treeChangeListenerPublisher.publishChanges(candidate, logContext);
- dataChangeListenerPublisher.publishChanges(candidate, logContext);
+ treeChangeListenerPublisher.publishChanges(candidate);
+ dataChangeListenerPublisher.publishChanges(candidate);
}
/**
public void registerTreeChangeListener(YangInstanceIdentifier treeId,
DOMDataTreeChangeListener listener, Optional<DataTreeCandidate> currentState,
Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
- notifierActor().tell(new ShardDataTreeChangePublisherActor.RegisterListener(treeId, listener, currentState,
- onRegistration), ActorRef.noSender());
+ final ShardDataTreeChangePublisherActor.RegisterListener regMessage =
+ new ShardDataTreeChangePublisherActor.RegisterListener(treeId, listener, currentState, onRegistration);
+ log.debug("{}: Sending {} to publisher actor {}", logContext(), regMessage, publisherActor());
+ publisherActor().tell(regMessage, ActorRef.noSender());
}
@Override
extends ShardDataTreeNotificationPublisherActor<ShardDataTreeChangeListenerPublisher> {
private ShardDataTreeChangePublisherActor(final String name, final String logContext) {
- super(new DefaultShardDataTreeChangeListenerPublisher(), name, logContext);
+ super(new DefaultShardDataTreeChangeListenerPublisher(logContext), name, logContext);
}
@Override
protected void handleReceive(Object message) {
if (message instanceof RegisterListener) {
RegisterListener reg = (RegisterListener)message;
+ LOG.debug("{}: Received {}", logContext(), reg);
if (reg.initialState.isPresent()) {
DefaultShardDataTreeChangeListenerPublisher.notifySingleListener(reg.path, reg.listener,
- reg.initialState.get());
+ reg.initialState.get(), logContext());
}
publisher().registerTreeChangeListener(reg.path, reg.listener, Optional.absent(), reg.onRegistration);
this.initialState = Preconditions.checkNotNull(initialState);
this.onRegistration = Preconditions.checkNotNull(onRegistration);
}
+
+ @Override
+ public String toString() {
+ return "RegisterListener [path=" + path + ", listener=" + listener + ", initialState present="
+ + initialState.isPresent() + "]";
+ }
}
}
interface ShardDataTreeNotificationPublisher {
long PUBLISH_DELAY_THRESHOLD_IN_MS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
- void publishChanges(DataTreeCandidate candidate, String logContext);
+ void publishChanges(DataTreeCandidate candidate);
}
timer.start();
try {
- publisher.publishChanges(toPublish.candidate, logContext);
+ publisher.publishChanges(toPublish.candidate);
} finally {
long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
public class EnableNotification {
private final boolean enabled;
+ private final String logContext;
- public EnableNotification(boolean enabled) {
+ public EnableNotification(boolean enabled, String logContext) {
this.enabled = enabled;
+ this.logContext = logContext;
}
public boolean isEnabled() {
return enabled;
}
+
+ public String getLogContext() {
+ return logContext;
+ }
}
public boolean isRegisterOnAllInstances() {
return registerOnAllInstances;
}
+
+ @Override
+ public String toString() {
+ return "RegisterChangeListener [path=" + path + ", scope=" + scope + ", registerOnAllInstances="
+ + registerOnAllInstances + ", dataChangeListenerActor=" + dataChangeListenerActor + "]";
+ }
}
path = SerializationUtils.deserializePath(in);
registerOnAllInstances = in.readBoolean();
}
+
+ @Override
+ public String toString() {
+ return "RegisterDataTreeChangeListener [path=" + path + ", registerOnAllInstances=" + registerOnAllInstances
+ + ", dataTreeChangeListenerPath=" + dataTreeChangeListenerPath + "]";
+ }
}
}
private void onInitConfigListener() {
- LOG.debug("{}: Initializing config listener.", persistenceId());
+ LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
org.opendaylight.mdsal.common.api.LogicalDatastoreType
private final MemberName memberName;
private final LogicalDatastoreType type;
private final ActorRef handlingActor;
+ private final String logName;
public ShardConfigHandler(final MemberName memberName,
final LogicalDatastoreType type,
this.memberName = memberName;
this.type = type;
this.handlingActor = handlingActor;
+ logName = memberName.getName() + "-" + type;
}
@Override
private void resolveWrite(final DataTreeCandidateNode rootNode) {
- LOG.debug("{}: New config received {}", memberName, rootNode);
- LOG.debug("{}: Data after: {}", memberName, rootNode.getDataAfter());
+ LOG.debug("{}: New config received {}", logName, rootNode);
+ LOG.debug("{}: Data after: {}", logName, rootNode.getDataAfter());
// were in the shards list, iter children and resolve
for (final DataTreeCandidateNode childNode : rootNode.getChildNodes()) {
}
}
+ @SuppressWarnings("unchecked")
private void resolveWrittenShard(final DataTreeCandidateNode childNode) {
final MapEntryNode entryNode = (MapEntryNode) childNode.getDataAfter().get();
final LeafNode<YangInstanceIdentifier> prefix =
final YangInstanceIdentifier identifier = prefix.getValue();
- LOG.debug("{}: Deserialized {} from datastore", memberName, identifier);
+ LOG.debug("{}: Deserialized {} from datastore", logName, identifier);
final ContainerNode replicas =
(ContainerNode) entryNode.getChild(new NodeIdentifier(SHARD_REPLICAS_QNAME)).get();
.map(child -> MemberName.forName(child.getValue()))
.collect(Collectors.toList());
- LOG.debug("{}: Replicas read from ds {}", memberName, retReplicas.toString());
+ LOG.debug("{}: Replicas read from ds {}", logName, retReplicas.toString());
final PrefixShardConfiguration newConfig =
new PrefixShardConfiguration(new DOMDataTreeIdentifier(type, identifier),
PrefixShardStrategy.NAME, retReplicas);
- LOG.debug("{}: Resulting config {}", memberName, newConfig);
+ LOG.debug("{}: Resulting config {} - sending PrefixShardCreated to {}", logName, newConfig, handlingActor);
handlingActor.tell(new PrefixShardCreated(newConfig), noSender());
}
private void resolveDelete(final DataTreeCandidateNode rootNode) {
}
+
+ @Override
+ public String toString() {
+ return "ShardConfigHandler [logName=" + logName + ", handlingActor=" + handlingActor + "]";
+ }
}
}
// Let the DataChangeListener know that notifications should be
// enabled
- subject.tell(new EnableNotification(true), getRef());
+ subject.tell(new EnableNotification(true, "test"), getRef());
subject.tell(new DataChanged(mockChangeEvent), getRef());
// Let the DataChangeListener know that notifications should be
// enabled
- subject.tell(new EnableNotification(true), getRef());
+ subject.tell(new EnableNotification(true, "test"), getRef());
subject.tell(new DataChanged(mockChangeEvent1), getRef());
expectMsgClass(DataChangedReply.class);
// Let the DataChangeListener know that notifications should be
// enabled
- subject.tell(new EnableNotification(true), getRef());
+ subject.tell(new EnableNotification(true, "test"), getRef());
subject.tell(new DataTreeChanged(mockCandidates), getRef());
// Let the DataChangeListener know that notifications should be
// enabled
- subject.tell(new EnableNotification(true), getRef());
+ subject.tell(new EnableNotification(true, "test"), getRef());
subject.tell(new DataTreeChanged(mockCandidates1), getRef());
expectMsgClass(DataTreeChangedReply.class);
final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
doReturn(executor).when(actorContext).getClientDispatcher();
+ doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext();
doReturn(mockActorSystem).when(actorContext).getActorSystem();
String shardName = "shard-1";